This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8c37b74a20 D205 Support - Providers: Apache to Common (inclusive)
(#32226)
8c37b74a20 is described below
commit 8c37b74a208a808d905c1b86d081d69d7a1aa900
Author: D. Ferruzzi <[email protected]>
AuthorDate: Wed Jun 28 11:22:52 2023 -0700
D205 Support - Providers: Apache to Common (inclusive) (#32226)
* D205 Support - Providers: Apache to Common (inclusive)
* CI Fixes
* Correct unintentional Type change.
---
airflow/providers/apache/beam/operators/beam.py | 27 ++++++++++++---------
.../providers/apache/cassandra/sensors/record.py | 4 ----
.../providers/apache/cassandra/sensors/table.py | 4 ----
.../apache/druid/operators/druid_check.py | 1 +
airflow/providers/apache/hdfs/hooks/hdfs.py | 20 +++++++++-------
airflow/providers/apache/hdfs/hooks/webhdfs.py | 1 +
.../providers/apache/hdfs/log/hdfs_task_handler.py | 4 +++-
airflow/providers/apache/hdfs/sensors/hdfs.py | 10 ++++----
airflow/providers/apache/hive/hooks/hive.py | 28 +++++++++++-----------
airflow/providers/apache/hive/macros/hive.py | 2 ++
.../apache/hive/sensors/metastore_partition.py | 10 ++++----
.../apache/hive/transfers/hive_to_mysql.py | 7 +++---
.../apache/hive/transfers/hive_to_samba.py | 3 +--
.../apache/hive/transfers/mssql_to_hive.py | 12 ++++++----
.../apache/hive/transfers/mysql_to_hive.py | 7 ++++--
.../providers/apache/hive/transfers/s3_to_hive.py | 7 ++++--
.../apache/hive/transfers/vertica_to_hive.py | 9 +++----
airflow/providers/apache/kafka/sensors/kafka.py | 6 ++---
airflow/providers/apache/livy/operators/livy.py | 7 +++---
airflow/providers/apache/pinot/hooks/pinot.py | 1 +
airflow/providers/apache/spark/hooks/spark_jdbc.py | 3 +--
airflow/providers/apache/spark/hooks/spark_sql.py | 6 ++---
.../providers/apache/spark/hooks/spark_submit.py | 4 ++--
.../providers/apache/spark/operators/spark_jdbc.py | 7 +++---
.../apache/spark/operators/spark_submit.py | 3 +--
airflow/providers/apprise/hooks/apprise.py | 1 +
airflow/providers/arangodb/hooks/arangodb.py | 3 +--
airflow/providers/asana/hooks/asana.py | 5 +---
airflow/providers/celery/sensors/celery_queue.py | 11 ++++-----
.../providers/cncf/kubernetes/hooks/kubernetes.py | 16 +++++++------
airflow/providers/cncf/kubernetes/operators/pod.py | 1 +
airflow/providers/cncf/kubernetes/triggers/pod.py | 3 ++-
.../providers/cncf/kubernetes/utils/delete_from.py | 5 +---
.../providers/cncf/kubernetes/utils/pod_manager.py | 14 +++++------
.../cncf/kubernetes/utils/xcom_sidecar.py | 6 +----
airflow/providers/common/sql/operators/sql.py | 21 +++++++++-------
36 files changed, 143 insertions(+), 136 deletions(-)
diff --git a/airflow/providers/apache/beam/operators/beam.py
b/airflow/providers/apache/beam/operators/beam.py
index 258a89552d..68e6972547 100644
--- a/airflow/providers/apache/beam/operators/beam.py
+++ b/airflow/providers/apache/beam/operators/beam.py
@@ -47,7 +47,8 @@ if TYPE_CHECKING:
class BeamDataflowMixin(metaclass=ABCMeta):
"""
- Helper class to store common, Dataflow specific logic for both
+ Helper class to store common, Dataflow specific logic for both.
+
:class:`~airflow.providers.apache.beam.operators.beam.BeamRunPythonPipelineOperator`,
:class:`~airflow.providers.apache.beam.operators.beam.BeamRunJavaPipelineOperator`
and
:class:`~airflow.providers.apache.beam.operators.beam.BeamRunGoPipelineOperator`.
@@ -205,11 +206,13 @@ class BeamBasePipelineOperator(BaseOperator,
BeamDataflowMixin, ABC):
class BeamRunPythonPipelineOperator(BeamBasePipelineOperator):
"""
- Launching Apache Beam pipelines written in Python. Note that both
- ``default_pipeline_options`` and ``pipeline_options`` will be merged to
specify pipeline
- execution parameter, and ``default_pipeline_options`` is expected to save
- high-level options, for instances, project and zone information, which
- apply to all beam operators in the DAG.
+ Launch Apache Beam pipelines written in Python.
+
+ Note that both ``default_pipeline_options`` and ``pipeline_options``
+ will be merged to specify pipeline execution parameter, and
+ ``default_pipeline_options`` is expected to save high-level options,
+ for instances, project and zone information, which apply to all beam
+ operators in the DAG.
.. seealso::
For more information on how to use this operator, take a look at the
guide:
@@ -498,11 +501,13 @@ class
BeamRunJavaPipelineOperator(BeamBasePipelineOperator):
class BeamRunGoPipelineOperator(BeamBasePipelineOperator):
"""
- Launching Apache Beam pipelines written in Go. Note that both
- ``default_pipeline_options`` and ``pipeline_options`` will be merged to
specify pipeline
- execution parameter, and ``default_pipeline_options`` is expected to save
- high-level options, for instances, project and zone information, which
- apply to all beam operators in the DAG.
+ Launch Apache Beam pipelines written in Go.
+
+ Note that both ``default_pipeline_options`` and ``pipeline_options``
+ will be merged to specify pipeline execution parameter, and
+ ``default_pipeline_options`` is expected to save high-level options,
+ for instances, project and zone information, which apply to all beam
+ operators in the DAG.
.. seealso::
For more information on how to use this operator, take a look at the
guide:
diff --git a/airflow/providers/apache/cassandra/sensors/record.py
b/airflow/providers/apache/cassandra/sensors/record.py
index 1221c6fc37..9f0a766456 100644
--- a/airflow/providers/apache/cassandra/sensors/record.py
+++ b/airflow/providers/apache/cassandra/sensors/record.py
@@ -15,10 +15,6 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""
-This module contains sensor that check the existence
-of a record in a Cassandra cluster.
-"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Sequence
diff --git a/airflow/providers/apache/cassandra/sensors/table.py
b/airflow/providers/apache/cassandra/sensors/table.py
index 60e8594f9b..62bfdaf72c 100644
--- a/airflow/providers/apache/cassandra/sensors/table.py
+++ b/airflow/providers/apache/cassandra/sensors/table.py
@@ -15,10 +15,6 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""
-This module contains sensor that check the existence
-of a table in a Cassandra cluster.
-"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Sequence
diff --git a/airflow/providers/apache/druid/operators/druid_check.py
b/airflow/providers/apache/druid/operators/druid_check.py
index c5767e0bfc..25813384c2 100644
--- a/airflow/providers/apache/druid/operators/druid_check.py
+++ b/airflow/providers/apache/druid/operators/druid_check.py
@@ -26,6 +26,7 @@ from airflow.providers.common.sql.operators.sql import
SQLCheckOperator
class DruidCheckOperator(SQLCheckOperator):
"""
This class is deprecated.
+
Please use `airflow.providers.common.sql.operators.sql.SQLCheckOperator`.
"""
diff --git a/airflow/providers/apache/hdfs/hooks/hdfs.py
b/airflow/providers/apache/hdfs/hooks/hdfs.py
index 7f6f892b4e..78ed258b3f 100644
--- a/airflow/providers/apache/hdfs/hooks/hdfs.py
+++ b/airflow/providers/apache/hdfs/hooks/hdfs.py
@@ -29,10 +29,12 @@ using `pip install apache-airflow-providers-hdfs==3.2.1`
(no constraints)
class HDFSHookException(AirflowException):
"""
- This Exception has been removed and is not functional. Please convert your
DAGs to use the
- WebHdfsHook or downgrade the provider to below 4.* if you want to continue
using it.
- If you want to use earlier provider you can downgrade to latest released
3.* version
- using `pip install apache-airflow-providers-hdfs==3.2.1` (no constraints).
+ This Exception has been removed and is not functional.
+
+ Please convert your DAGs to use the WebHdfsHook or downgrade the provider
+ to below 4.* if you want to continue using it. If you want to use earlier
+ provider you can downgrade to latest released 3.* version using
+ `pip install apache-airflow-providers-hdfs==3.2.1` (no constraints).
"""
def __init__(self, *args, **kwargs):
@@ -41,10 +43,12 @@ class HDFSHookException(AirflowException):
class HDFSHook(BaseHook):
"""
- This Hook has been removed and is not functional. Please convert your DAGs
to use the
- WebHdfsHook or downgrade the provider to below 4.*. if you want to
continue using it.
- If you want to use earlier provider you can downgrade to latest released
3.* version
- using `pip install apache-airflow-providers-hdfs==3.2.1` (no constraints).
+ This Hook has been removed and is not functional.
+
+ Please convert your DAGs to use the WebHdfsHook or downgrade the provider
+ to below 4.*. if you want to continue using it. If you want to use earlier
+ provider you can downgrade to latest released 3.* version using
+ `pip install apache-airflow-providers-hdfs==3.2.1` (no constraints).
"""
def __init__(self, *args, **kwargs):
diff --git a/airflow/providers/apache/hdfs/hooks/webhdfs.py
b/airflow/providers/apache/hdfs/hooks/webhdfs.py
index f6898ed06d..319a1bd60b 100644
--- a/airflow/providers/apache/hdfs/hooks/webhdfs.py
+++ b/airflow/providers/apache/hdfs/hooks/webhdfs.py
@@ -60,6 +60,7 @@ class WebHDFSHook(BaseHook):
def get_conn(self) -> Any:
"""
Establishes a connection depending on the security mode set via config
or environment variable.
+
:return: a hdfscli InsecureClient or KerberosClient object.
"""
connection = self._find_valid_server()
diff --git a/airflow/providers/apache/hdfs/log/hdfs_task_handler.py
b/airflow/providers/apache/hdfs/log/hdfs_task_handler.py
index 53d46f954d..648b42e3d9 100644
--- a/airflow/providers/apache/hdfs/log/hdfs_task_handler.py
+++ b/airflow/providers/apache/hdfs/log/hdfs_task_handler.py
@@ -107,7 +107,9 @@ class HdfsTaskHandler(FileTaskHandler, LoggingMixin):
return messages, logs
def _read(self, ti, try_number, metadata=None):
- """Read logs of given task instance and try_number from HDFS.
+ """
+ Read logs of given task instance and try_number from HDFS.
+
If failed, read the log from task instance host machine.
todo: when min airflow version >= 2.6 then remove this method
(``_read``)
diff --git a/airflow/providers/apache/hdfs/sensors/hdfs.py
b/airflow/providers/apache/hdfs/sensors/hdfs.py
index da7e5e0171..a439c5d236 100644
--- a/airflow/providers/apache/hdfs/sensors/hdfs.py
+++ b/airflow/providers/apache/hdfs/sensors/hdfs.py
@@ -28,10 +28,12 @@ using `pip install apache-airflow-providers-hdfs==3.2.1`
(no constraints)
class HdfsSensor(BaseSensorOperator):
"""
- This Sensor has been removed and is not functional. Please convert your
DAGs to use the
- WebHdfsSensor or downgrade the provider to below 4.* if you want to
continue using it.
- If you want to use earlier provider you can downgrade to latest released
3.* version
- using `pip install apache-airflow-providers-hdfs==3.2.1` (no constraints).
+ This Sensor has been removed and is not functional.
+
+ Please convert your DAGs to use the WebHdfsSensor or downgrade the provider
+ to below 4.* if you want to continue using it. If you want to use earlier
+ provider you can downgrade to latest released 3.* version using
+ `pip install apache-airflow-providers-hdfs==3.2.1` (no constraints).
"""
def __init__(self, *args, **kwargs):
diff --git a/airflow/providers/apache/hive/hooks/hive.py
b/airflow/providers/apache/hive/hooks/hive.py
index 945e6ba308..4ad3b9d12c 100644
--- a/airflow/providers/apache/hive/hooks/hive.py
+++ b/airflow/providers/apache/hive/hooks/hive.py
@@ -52,8 +52,7 @@ HIVE_QUEUE_PRIORITIES = ["VERY_HIGH", "HIGH", "NORMAL",
"LOW", "VERY_LOW"]
def get_context_from_env_var() -> dict[Any, Any]:
"""
- Extract context from env variable, e.g. dag_id, task_id and execution_date,
- so that they can be used inside BashOperator and PythonOperator.
+ Extract context from env variable, (dag_id, task_id, etc) for use in
BashOperator and PythonOperator.
:return: The context of interest.
"""
@@ -188,8 +187,7 @@ class HiveCliHook(BaseHook):
@staticmethod
def _prepare_hiveconf(d: dict[Any, Any]) -> list[Any]:
"""
- This function prepares a list of hiveconf params
- from a dictionary of key value pairs.
+ Prepares a list of hiveconf params from a dictionary of key value
pairs.
:param d:
@@ -212,9 +210,10 @@ class HiveCliHook(BaseHook):
hive_conf: dict[Any, Any] | None = None,
) -> Any:
"""
- Run an hql statement using the hive cli. If hive_conf is specified
- it should be a dict and the entries will be set as key/value pairs
- in HiveConf.
+ Run an hql statement using the hive cli.
+
+ If hive_conf is specified it should be a dict and the entries
+ will be set as key/value pairs in HiveConf.
:param hql: an hql (hive query language) statement to run with hive cli
:param schema: Name of hive schema (database) to use
@@ -652,8 +651,9 @@ class HiveMetastoreHook(BaseHook):
def get_partitions(self, schema: str, table_name: str, partition_filter:
str | None = None) -> list[Any]:
"""
- Returns a list of all partitions in a table. Works only
- for tables with less than 32767 (java short max val).
+ Returns a list of all partitions in a table.
+
+ Works only for tables with less than 32767 (java short max val).
For subpartitioned table, the number might easily exceed this.
>>> hh = HiveMetastoreHook()
@@ -689,9 +689,9 @@ class HiveMetastoreHook(BaseHook):
part_specs: list[Any], partition_key: str | None, filter_map:
dict[str, Any] | None
) -> Any:
"""
- Helper method to get max partition of partitions with partition_key
- from part specs. key:value pair in filter_map will be used to
- filter out partitions.
+ Helper method to get max partition of partitions with partition_key
from part specs.
+
+ key:value pair in filter_map will be used to filter out partitions.
:param part_specs: list of partition specs.
:param partition_key: partition key name.
@@ -736,6 +736,7 @@ class HiveMetastoreHook(BaseHook):
) -> Any:
"""
Returns the maximum value for all partitions with given field in a
table.
+
If only one partition key exist in the table, the key will be used as
field.
filter_map should be a partition_key:partition_value map and will be
used to
filter out partitions.
@@ -1014,8 +1015,7 @@ class HiveServer2Hook(DbApiHook):
self, sql: str | list[str], parameters: Iterable | Mapping | None =
None, **kwargs
) -> Any:
"""
- Get a set of records from a Hive query. You can optionally pass
'schema' kwarg
- which specifies target schema and default to 'default'.
+ Get a set of records from a Hive query; optionally pass a 'schema'
kwarg to specify target schema.
:param sql: hql to be executed.
:param parameters: optional configuration passed to get_results
diff --git a/airflow/providers/apache/hive/macros/hive.py
b/airflow/providers/apache/hive/macros/hive.py
index 7da0202eb2..382b0b680b 100644
--- a/airflow/providers/apache/hive/macros/hive.py
+++ b/airflow/providers/apache/hive/macros/hive.py
@@ -53,6 +53,7 @@ def max_partition(
def _closest_date(target_dt, date_list, before_target=None) -> datetime.date |
None:
"""
This function finds the date in a list closest to the target date.
+
An optional parameter can be given to get the closest before or after.
:param target_dt: The target date
@@ -76,6 +77,7 @@ def closest_ds_partition(
) -> str | None:
"""
This function finds the date in a list closest to the target date.
+
An optional parameter can be given to get the closest before or after.
:param table: A hive table name
diff --git a/airflow/providers/apache/hive/sensors/metastore_partition.py
b/airflow/providers/apache/hive/sensors/metastore_partition.py
index aaa0da12d7..043334e459 100644
--- a/airflow/providers/apache/hive/sensors/metastore_partition.py
+++ b/airflow/providers/apache/hive/sensors/metastore_partition.py
@@ -27,11 +27,11 @@ if TYPE_CHECKING:
class MetastorePartitionSensor(SqlSensor):
"""
- An alternative to the HivePartitionSensor that talk directly to the
- MySQL db. This was created as a result of observing sub optimal
- queries generated by the Metastore thrift service when hitting
- subpartitioned tables. The Thrift service's queries were written in a
- way that would not leverage the indexes.
+ An alternative to the HivePartitionSensor that talk directly to the MySQL
db.
+
+ This was created as a result of observing sub optimal queries generated by
the
+ Metastore thrift service when hitting subpartitioned tables. The Thrift
service's
+ queries were written in a way that would not leverage the indexes.
:param schema: the schema
:param table: the table
diff --git a/airflow/providers/apache/hive/transfers/hive_to_mysql.py
b/airflow/providers/apache/hive/transfers/hive_to_mysql.py
index 6d048d7b73..6eb08d2cc2 100644
--- a/airflow/providers/apache/hive/transfers/hive_to_mysql.py
+++ b/airflow/providers/apache/hive/transfers/hive_to_mysql.py
@@ -32,9 +32,10 @@ if TYPE_CHECKING:
class HiveToMySqlOperator(BaseOperator):
"""
- Moves data from Hive to MySQL, note that for now the data is loaded
- into memory before being pushed to MySQL, so this operator should
- be used for smallish amount of data.
+ Moves data from Hive to MySQL.
+
+ Note that for now the data is loaded into memory before being pushed
+ to MySQL, so this operator should be used for smallish amount of data.
:param sql: SQL query to execute against Hive server. (templated)
:param mysql_table: target MySQL table, use dot notation to target a
diff --git a/airflow/providers/apache/hive/transfers/hive_to_samba.py
b/airflow/providers/apache/hive/transfers/hive_to_samba.py
index 0ad815ea9c..e81e695f47 100644
--- a/airflow/providers/apache/hive/transfers/hive_to_samba.py
+++ b/airflow/providers/apache/hive/transfers/hive_to_samba.py
@@ -32,8 +32,7 @@ if TYPE_CHECKING:
class HiveToSambaOperator(BaseOperator):
"""
- Executes hql code in a specific Hive database and loads the
- results of the query as a csv to a Samba location.
+ Execute hql code in a specific Hive database and load the results as a csv
to a Samba location.
:param hql: the hql to be exported. (templated)
:param destination_filepath: the file path to where the file will be
pushed onto samba
diff --git a/airflow/providers/apache/hive/transfers/mssql_to_hive.py
b/airflow/providers/apache/hive/transfers/mssql_to_hive.py
index ccfef831e5..d803911956 100644
--- a/airflow/providers/apache/hive/transfers/mssql_to_hive.py
+++ b/airflow/providers/apache/hive/transfers/mssql_to_hive.py
@@ -35,11 +35,13 @@ if TYPE_CHECKING:
class MsSqlToHiveOperator(BaseOperator):
"""
- Moves data from Microsoft SQL Server to Hive. The operator runs
- your query against Microsoft SQL Server, stores the file locally
- before loading it into a Hive table. If the ``create`` or
- ``recreate`` arguments are set to ``True``,
- a ``CREATE TABLE`` and ``DROP TABLE`` statements are generated.
+ Moves data from Microsoft SQL Server to Hive.
+
+ The operator runs your query against Microsoft SQL Server, stores
+ the file locally before loading it into a Hive table. If the
+ ``create`` or ``recreate`` arguments are set to ``True``, a
+ ``CREATE TABLE`` and ``DROP TABLE`` statements are generated.
+
Hive data types are inferred from the cursor's metadata.
Note that the table generated in Hive uses ``STORED AS textfile``
which isn't the most efficient serialization format. If a
diff --git a/airflow/providers/apache/hive/transfers/mysql_to_hive.py
b/airflow/providers/apache/hive/transfers/mysql_to_hive.py
index 003b6e3f83..d01433e16e 100644
--- a/airflow/providers/apache/hive/transfers/mysql_to_hive.py
+++ b/airflow/providers/apache/hive/transfers/mysql_to_hive.py
@@ -36,8 +36,11 @@ if TYPE_CHECKING:
class MySqlToHiveOperator(BaseOperator):
"""
- Moves data from MySql to Hive. The operator runs your query against
- MySQL, stores the file locally before loading it into a Hive table.
+ Moves data from MySql to Hive.
+
+ The operator runs your query against MySQL, stores the file locally
+ before loading it into a Hive table.
+
If the ``create`` or ``recreate`` arguments are set to ``True``,
a ``CREATE TABLE`` and ``DROP TABLE`` statements are generated.
Hive data types are inferred from the cursor's metadata. Note that the
diff --git a/airflow/providers/apache/hive/transfers/s3_to_hive.py
b/airflow/providers/apache/hive/transfers/s3_to_hive.py
index a3f3184a44..16791611c0 100644
--- a/airflow/providers/apache/hive/transfers/s3_to_hive.py
+++ b/airflow/providers/apache/hive/transfers/s3_to_hive.py
@@ -37,8 +37,11 @@ if TYPE_CHECKING:
class S3ToHiveOperator(BaseOperator):
"""
- Moves data from S3 to Hive. The operator downloads a file from S3,
- stores the file locally before loading it into a Hive table.
+ Moves data from S3 to Hive.
+
+ The operator downloads a file from S3, stores the file locally
+ before loading it into a Hive table.
+
If the ``create`` or ``recreate`` arguments are set to ``True``,
a ``CREATE TABLE`` and ``DROP TABLE`` statements are generated.
Hive data types are inferred from the cursor's metadata from.
diff --git a/airflow/providers/apache/hive/transfers/vertica_to_hive.py
b/airflow/providers/apache/hive/transfers/vertica_to_hive.py
index 781639fa15..81869ed95a 100644
--- a/airflow/providers/apache/hive/transfers/vertica_to_hive.py
+++ b/airflow/providers/apache/hive/transfers/vertica_to_hive.py
@@ -33,10 +33,11 @@ if TYPE_CHECKING:
class VerticaToHiveOperator(BaseOperator):
"""
- Moves data from Vertica to Hive. The operator runs
- your query against Vertica, stores the file locally
- before loading it into a Hive table. If the ``create`` or
- ``recreate`` arguments are set to ``True``,
+ Moves data from Vertica to Hive.
+
+ The operator runs your query against Vertica, stores the file
+ locally before loading it into a Hive table. If the ``create``
+ or ``recreate`` arguments are set to ``True``,
a ``CREATE TABLE`` and ``DROP TABLE`` statements are generated.
Hive data types are inferred from the cursor's metadata.
Note that the table generated in Hive uses ``STORED AS textfile``
diff --git a/airflow/providers/apache/kafka/sensors/kafka.py
b/airflow/providers/apache/kafka/sensors/kafka.py
index 747c01ddc5..83b8e0220b 100644
--- a/airflow/providers/apache/kafka/sensors/kafka.py
+++ b/airflow/providers/apache/kafka/sensors/kafka.py
@@ -112,10 +112,8 @@ class AwaitMessageSensor(BaseOperator):
class AwaitMessageTriggerFunctionSensor(BaseOperator):
- """An Airflow sensor that defers until a specific message is published to
- Kafka, then triggers a registered function, and goes back to waiting for
- a message.
-
+ """
+ Defer until a specific message is published to Kafka, trigger a registered
function, then resume waiting.
The behavior of the consumer for this trigger is as follows:
- poll the Kafka topics for a message
diff --git a/airflow/providers/apache/livy/operators/livy.py
b/airflow/providers/apache/livy/operators/livy.py
index 673cb50ef3..fa4f357343 100644
--- a/airflow/providers/apache/livy/operators/livy.py
+++ b/airflow/providers/apache/livy/operators/livy.py
@@ -31,8 +31,7 @@ if TYPE_CHECKING:
class LivyOperator(BaseOperator):
"""
- This operator wraps the Apache Livy batch REST API, allowing to submit a
Spark
- application to the underlying cluster.
+ Wraps the Apache Livy batch REST API, allowing to submit a Spark
application to the underlying cluster.
:param file: path of the file containing the application to execute
(required). (templated)
:param class_name: name of the application Java/Spark main class.
(templated)
@@ -204,8 +203,8 @@ class LivyOperator(BaseOperator):
def execute_complete(self, context: Context, event: dict[str, Any]) -> Any:
"""
Callback for when the trigger fires - returns immediately.
- Relies on trigger to throw an exception, otherwise it assumes
execution was
- successful.
+
+ Relies on trigger to throw an exception, otherwise it assumes
execution was successful.
"""
# dump the logs from livy to worker through triggerer.
if event.get("log_lines", None) is not None:
diff --git a/airflow/providers/apache/pinot/hooks/pinot.py
b/airflow/providers/apache/pinot/hooks/pinot.py
index caf0709671..1053e220af 100644
--- a/airflow/providers/apache/pinot/hooks/pinot.py
+++ b/airflow/providers/apache/pinot/hooks/pinot.py
@@ -32,6 +32,7 @@ from airflow.providers.common.sql.hooks.sql import DbApiHook
class PinotAdminHook(BaseHook):
"""
This hook is a wrapper around the pinot-admin.sh script.
+
For now, only small subset of its subcommands are implemented,
which are required to ingest offline data into Apache Pinot
(i.e., AddSchema, AddTable, CreateSegment, and UploadSegment).
diff --git a/airflow/providers/apache/spark/hooks/spark_jdbc.py
b/airflow/providers/apache/spark/hooks/spark_jdbc.py
index 60ed0ebe57..f04c476f1a 100644
--- a/airflow/providers/apache/spark/hooks/spark_jdbc.py
+++ b/airflow/providers/apache/spark/hooks/spark_jdbc.py
@@ -26,8 +26,7 @@ from airflow.providers.apache.spark.hooks.spark_submit import
SparkSubmitHook
class SparkJDBCHook(SparkSubmitHook):
"""
- This hook extends the SparkSubmitHook specifically for performing data
- transfers to/from JDBC-based databases with Apache Spark.
+ Extends the SparkSubmitHook for performing data transfers to/from
JDBC-based databases with Apache Spark.
:param spark_app_name: Name of the job (default airflow-spark-jdbc)
:param spark_conn_id: The :ref:`spark connection id
<howto/connection:spark>`
diff --git a/airflow/providers/apache/spark/hooks/spark_sql.py
b/airflow/providers/apache/spark/hooks/spark_sql.py
index 685c3330d1..6864aa52fe 100644
--- a/airflow/providers/apache/spark/hooks/spark_sql.py
+++ b/airflow/providers/apache/spark/hooks/spark_sql.py
@@ -29,8 +29,7 @@ if TYPE_CHECKING:
class SparkSqlHook(BaseHook):
"""
- This hook is a wrapper around the spark-sql binary. It requires that the
- "spark-sql" binary is in the PATH.
+ This hook is a wrapper around the spark-sql binary; requires the
"spark-sql" binary to be in the PATH.
:param sql: The SQL query to execute
:param conf: arbitrary Spark configuration property
@@ -112,8 +111,7 @@ class SparkSqlHook(BaseHook):
def _prepare_command(self, cmd: str | list[str]) -> list[str]:
"""
- Construct the spark-sql command to execute. Verbose output is enabled
- as default.
+ Construct the spark-sql command to execute. Verbose output is enabled
as default.
:param cmd: command to append to the spark-sql command
:return: full command to be executed
diff --git a/airflow/providers/apache/spark/hooks/spark_submit.py
b/airflow/providers/apache/spark/hooks/spark_submit.py
index 923bc6d89f..5687df8ac8 100644
--- a/airflow/providers/apache/spark/hooks/spark_submit.py
+++ b/airflow/providers/apache/spark/hooks/spark_submit.py
@@ -38,8 +38,7 @@ ALLOWED_SPARK_BINARIES = ["spark-submit", "spark2-submit",
"spark3-submit"]
class SparkSubmitHook(BaseHook, LoggingMixin):
"""
- This hook is a wrapper around the spark-submit binary to kick off a
spark-submit job.
- It requires that the "spark-submit" binary is in the PATH.
+ Wrap the spark-submit binary to kick off a spark-submit job; requires
"spark-submit" binary in the PATH.
:param conf: Arbitrary Spark configuration properties
:param spark_conn_id: The :ref:`spark connection id
<howto/connection:spark>` as configured
@@ -520,6 +519,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
def _start_driver_status_tracking(self) -> None:
"""
Polls the driver based on self._driver_id to get the status.
+
Finish successfully when the status is FINISHED.
Finish failed when the status is ERROR/UNKNOWN/KILLED/FAILED.
diff --git a/airflow/providers/apache/spark/operators/spark_jdbc.py
b/airflow/providers/apache/spark/operators/spark_jdbc.py
index 29312d8a7c..70eb224828 100644
--- a/airflow/providers/apache/spark/operators/spark_jdbc.py
+++ b/airflow/providers/apache/spark/operators/spark_jdbc.py
@@ -28,10 +28,9 @@ if TYPE_CHECKING:
class SparkJDBCOperator(SparkSubmitOperator):
"""
- This operator extends the SparkSubmitOperator specifically for performing
data
- transfers to/from JDBC-based databases with Apache Spark. As with the
- SparkSubmitOperator, it assumes that the "spark-submit" binary is
available on the
- PATH.
+ Extend the SparkSubmitOperator to perform data transfers to/from
JDBC-based databases with Apache Spark.
+
+ As with the SparkSubmitOperator, it assumes that the "spark-submit"
binary is available on the PATH.
.. seealso::
For more information on how to use this operator, take a look at the
guide:
diff --git a/airflow/providers/apache/spark/operators/spark_submit.py
b/airflow/providers/apache/spark/operators/spark_submit.py
index c2a5dfc8b2..0620a44c0e 100644
--- a/airflow/providers/apache/spark/operators/spark_submit.py
+++ b/airflow/providers/apache/spark/operators/spark_submit.py
@@ -29,8 +29,7 @@ if TYPE_CHECKING:
class SparkSubmitOperator(BaseOperator):
"""
- This hook is a wrapper around the spark-submit binary to kick off a
spark-submit job.
- It requires that the "spark-submit" binary is in the PATH.
+ Wrap the spark-submit binary to kick off a spark-submit job; requires
"spark-submit" binary in the PATH.
.. seealso::
For more information on how to use this operator, take a look at the
guide:
diff --git a/airflow/providers/apprise/hooks/apprise.py
b/airflow/providers/apprise/hooks/apprise.py
index 37a1d520ca..61548be76a 100644
--- a/airflow/providers/apprise/hooks/apprise.py
+++ b/airflow/providers/apprise/hooks/apprise.py
@@ -28,6 +28,7 @@ from apprise import AppriseConfig, NotifyFormat, NotifyType
class AppriseHook(BaseHook):
"""
Use Apprise(https://github.com/caronc/apprise) to interact with
notification services.
+
The complete list of notification services supported by Apprise can be
found at:
https://github.com/caronc/apprise/wiki#notification-services.
diff --git a/airflow/providers/arangodb/hooks/arangodb.py
b/airflow/providers/arangodb/hooks/arangodb.py
index 23b3aa4124..1495c598a5 100644
--- a/airflow/providers/arangodb/hooks/arangodb.py
+++ b/airflow/providers/arangodb/hooks/arangodb.py
@@ -93,8 +93,7 @@ class ArangoDBHook(BaseHook):
def query(self, query, **kwargs) -> Cursor:
"""
- Function to create an ArangoDB session
- and execute the AQL query in the session.
+ Function to create an ArangoDB session and execute the AQL query in
the session.
:param query: AQL query
"""
diff --git a/airflow/providers/asana/hooks/asana.py
b/airflow/providers/asana/hooks/asana.py
index b3d37504b0..6278288877 100644
--- a/airflow/providers/asana/hooks/asana.py
+++ b/airflow/providers/asana/hooks/asana.py
@@ -28,10 +28,7 @@ from airflow.hooks.base import BaseHook
def _ensure_prefixes(conn_type):
- """
- Remove when provider min airflow version >= 2.5.0 since this is handled by
- provider manager from that version.
- """
+ """Remove when provider min airflow version >= 2.5.0 since this is now
handled by provider manager."""
def dec(func):
@wraps(func)
diff --git a/airflow/providers/celery/sensors/celery_queue.py
b/airflow/providers/celery/sensors/celery_queue.py
index 78e412c06a..4533217bff 100644
--- a/airflow/providers/celery/sensors/celery_queue.py
+++ b/airflow/providers/celery/sensors/celery_queue.py
@@ -29,9 +29,10 @@ if TYPE_CHECKING:
class CeleryQueueSensor(BaseSensorOperator):
"""
- Waits for a Celery queue to be empty. By default, in order to be considered
- empty, the queue must not have any tasks in the ``reserved``, ``scheduled``
- or ``active`` states.
+ Waits for a Celery queue to be empty.
+
+ By default, in order to be considered empty, the queue must not have
+ any tasks in the ``reserved``, ``scheduled`` or ``active`` states.
:param celery_queue: The name of the Celery queue to wait for.
:param target_task_id: Task id for checking
@@ -45,9 +46,7 @@ class CeleryQueueSensor(BaseSensorOperator):
def _check_task_id(self, context: Context) -> bool:
"""
- Gets the returned Celery result from the Airflow task
- ID provided to the sensor, and returns True if the
- celery result has been finished execution.
+ Get the Celery result from the Airflow task ID and return True if the
result has finished execution.
:param context: Airflow's execution context
:return: True if task has been executed, otherwise False
diff --git a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
index a3851a8987..f237461155 100644
--- a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
+++ b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
@@ -162,6 +162,8 @@ class KubernetesHook(BaseHook, PodOperatorHookProtocol):
def _get_field(self, field_name):
"""
+ Handles backcompat for extra fields.
+
Prior to Airflow 2.3, in order to make use of UI customizations for
extra fields,
we needed to store them with the prefix ``extra__kubernetes__``. This
method
handles the backcompat, i.e. if the extra dict contains prefixed
fields.
@@ -429,10 +431,7 @@ class KubernetesHook(BaseHook, PodOperatorHookProtocol):
def _get_bool(val) -> bool | None:
- """
- Converts val to bool if can be done with certainty.
- If we cannot infer intention we return None.
- """
+ """Converts val to bool if can be done with certainty; if we cannot infer
intention we return None."""
if isinstance(val, bool):
return val
elif isinstance(val, str):
@@ -569,9 +568,12 @@ class AsyncKubernetesHook(KubernetesHook):
async def read_logs(self, name: str, namespace: str):
"""
- Reads logs inside the pod while starting containers inside. All the
logs will be outputted with its
- timestamp to track the logs after the execution of the pod is
completed. The method is used for async
- output of the logs only in the pod failed it execution or the task was
cancelled by the user.
+ Reads logs inside the pod while starting containers inside.
+
+ All the logs will be outputted with its timestamp to track
+ the logs after the execution of the pod is completed. The
+ method is used for async output of the logs only in the pod
+ failed it execution or the task was cancelled by the user.
:param name: Name of the pod.
:param namespace: Name of the pod's namespace.
diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py
b/airflow/providers/cncf/kubernetes/operators/pod.py
index 2ca80b84f5..87bfc41301 100644
--- a/airflow/providers/cncf/kubernetes/operators/pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -867,6 +867,7 @@ class KubernetesPodOperator(BaseOperator):
def dry_run(self) -> None:
"""
Prints out the pod definition that would be created by this operator.
+
Does not include labels specific to the task instance (since there
isn't
one in a dry_run) and excludes all empty elements.
"""
diff --git a/airflow/providers/cncf/kubernetes/triggers/pod.py
b/airflow/providers/cncf/kubernetes/triggers/pod.py
index 0a7b41b8c0..e2b5edd8e4 100644
--- a/airflow/providers/cncf/kubernetes/triggers/pod.py
+++ b/airflow/providers/cncf/kubernetes/triggers/pod.py
@@ -32,7 +32,8 @@ from airflow.triggers.base import BaseTrigger, TriggerEvent
class ContainerState(str, Enum):
"""
- Possible container states
+ Possible container states.
+
See
https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase.
"""
diff --git a/airflow/providers/cncf/kubernetes/utils/delete_from.py
b/airflow/providers/cncf/kubernetes/utils/delete_from.py
index 59d1896b78..98d1c4d6d9 100644
--- a/airflow/providers/cncf/kubernetes/utils/delete_from.py
+++ b/airflow/providers/cncf/kubernetes/utils/delete_from.py
@@ -142,10 +142,7 @@ def _delete_from_yaml_single_item(
class FailToDeleteError(Exception):
- """
- An exception class for handling error if an error occurred when
- handling a yaml file during deletion of the resource.
- """
+ """For handling error if an error occurred when handling a yaml file
during deletion of the resource."""
def __init__(self, api_exceptions: list):
self.api_exceptions = api_exceptions
diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 6d1cefa3b9..3bacb95f4f 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -63,7 +63,8 @@ def should_retry_start_pod(exception: BaseException) -> bool:
class PodPhase:
"""
- Possible pod phases
+ Possible pod phases.
+
See
https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase.
"""
@@ -114,6 +115,7 @@ def get_container_status(pod: V1Pod, container_name: str)
-> V1ContainerStatus |
def container_is_running(pod: V1Pod, container_name: str) -> bool:
"""
Examines V1Pod ``pod`` to determine whether ``container_name`` is running.
+
If that container is present and running, returns True. Returns False
otherwise.
"""
container_status = get_container_status(pod, container_name)
@@ -125,6 +127,7 @@ def container_is_running(pod: V1Pod, container_name: str)
-> bool:
def container_is_terminated(pod: V1Pod, container_name: str) -> bool:
"""
Examines V1Pod ``pod`` to determine whether ``container_name`` is
terminated.
+
If that container is present and terminated, returns True. Returns False
otherwise.
"""
container_statuses = pod.status.container_statuses if pod and pod.status
else None
@@ -145,8 +148,7 @@ def get_container_termination_message(pod: V1Pod,
container_name: str):
class PodLogsConsumer:
"""
- PodLogsConsumer is responsible for pulling pod logs from a stream with
checking a container status before
- reading data.
+ Responsible for pulling pod logs from a stream with checking a container
status before reading data.
This class is a workaround for the issue
https://github.com/apache/airflow/issues/23497.
@@ -239,10 +241,7 @@ class PodLoggingStatus:
class PodManager(LoggingMixin):
- """
- Helper class for creating, monitoring, and otherwise interacting with
Kubernetes pods
- for use with the KubernetesPodOperator.
- """
+ """Create, monitor, and otherwise interact with Kubernetes pods for use
with the KubernetesPodOperator."""
def __init__(
self,
@@ -358,6 +357,7 @@ class PodManager(LoggingMixin):
) -> DateTime | None:
"""
Tries to follow container logs until container completes.
+
For a long-running container, sometimes the log read may be
interrupted
Such errors of this kind are suppressed.
diff --git a/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py
b/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py
index bb999b5a54..11a62f9c5d 100644
--- a/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py
+++ b/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py
@@ -14,11 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""
-This module handles all xcom functionality for the KubernetesPodOperator
-by attaching a sidecar container that blocks the pod from completing until
-Airflow has pulled result data into the worker for xcom serialization.
-"""
+"""Attach a sidecar container that blocks the pod from completing until
Airflow pulls result data."""
from __future__ import annotations
import copy
diff --git a/airflow/providers/common/sql/operators/sql.py
b/airflow/providers/common/sql/operators/sql.py
index 5a3783acb5..a8eea3d6d8 100644
--- a/airflow/providers/common/sql/operators/sql.py
+++ b/airflow/providers/common/sql/operators/sql.py
@@ -294,6 +294,7 @@ class SQLExecuteQueryOperator(BaseSQLOperator):
class SQLColumnCheckOperator(BaseSQLOperator):
"""
Performs one or more of the templated checks in the column_checks
dictionary.
+
Checks are performed on a per-column basis specified by the column_mapping.
Each check can take one or more of the following options:
@@ -540,6 +541,7 @@ class SQLColumnCheckOperator(BaseSQLOperator):
class SQLTableCheckOperator(BaseSQLOperator):
"""
Performs one or more of the checks provided in the checks dictionary.
+
Checks should be written to return a boolean result.
:param table: the table to run checks on
@@ -650,10 +652,11 @@ class SQLTableCheckOperator(BaseSQLOperator):
class SQLCheckOperator(BaseSQLOperator):
"""
- Performs checks against a db. The ``SQLCheckOperator`` expects
- a sql query that will return a single row. Each value on that
- first row is evaluated using python ``bool`` casting. If any of the
- values return ``False`` the check is failed and errors out.
+ Performs checks against a db.
+
+ The ``SQLCheckOperator`` expects a sql query that will return a single row.
+ Each value on that first row is evaluated using python ``bool`` casting.
+ If any of the values return ``False`` the check is failed and errors out.
Note that Python bool casting evals the following as ``False``:
@@ -808,8 +811,7 @@ class SQLValueCheckOperator(BaseSQLOperator):
class SQLIntervalCheckOperator(BaseSQLOperator):
"""
- Checks that the values of metrics given as SQL expressions are within
- a certain tolerance of the ones from days_back before.
+ Check that metrics given as SQL expressions are within tolerance of the
ones from days_back before.
:param table: the table name
:param conn_id: the connection ID used to connect to the database.
@@ -946,9 +948,9 @@ class SQLIntervalCheckOperator(BaseSQLOperator):
class SQLThresholdCheckOperator(BaseSQLOperator):
"""
- Performs a value check using sql code against a minimum threshold
- and a maximum threshold. Thresholds can be in the form of a numeric
- value OR a sql statement that results a numeric.
+ Performs a value check using sql code against a minimum threshold and a
maximum threshold.
+
+ Thresholds can be in the form of a numeric value OR a sql statement that
results a numeric.
:param sql: the sql to be executed. (templated)
:param conn_id: the connection ID used to connect to the database.
@@ -1028,6 +1030,7 @@ class SQLThresholdCheckOperator(BaseSQLOperator):
def push(self, meta_data):
"""
Optional: Send data check info and metadata to an external database.
+
Default functionality will log metadata.
"""
info = "\n".join(f"""{key}: {item}""" for key, item in
meta_data.items())