This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a473facf6c Add D400 pydocstyle check - Apache providers only (#31424)
a473facf6c is described below
commit a473facf6c0b36f7d051ecc2d1aa94ba6957468d
Author: Vincent <[email protected]>
AuthorDate: Wed May 31 22:36:49 2023 -0400
Add D400 pydocstyle check - Apache providers only (#31424)
---
airflow/providers/apache/beam/hooks/beam.py | 6 ++--
.../providers/apache/cassandra/hooks/cassandra.py | 8 ++---
airflow/providers/apache/drill/hooks/drill.py | 2 +-
airflow/providers/apache/druid/hooks/druid.py | 8 ++---
airflow/providers/apache/druid/operators/druid.py | 2 +-
.../apache/druid/transfers/hive_to_druid.py | 2 +-
.../apache/flink/operators/flink_kubernetes.py | 2 +-
.../apache/flink/sensors/flink_kubernetes.py | 2 +-
airflow/providers/apache/hdfs/hooks/webhdfs.py | 4 +--
airflow/providers/apache/hdfs/sensors/web_hdfs.py | 2 +-
airflow/providers/apache/hive/hooks/hive.py | 28 ++++++++--------
airflow/providers/apache/hive/operators/hive.py | 2 +-
.../providers/apache/hive/operators/hive_stats.py | 6 ++--
.../apache/hive/transfers/vertica_to_hive.py | 5 +--
airflow/providers/apache/kafka/hooks/base.py | 10 +++---
airflow/providers/apache/kafka/hooks/client.py | 4 +--
airflow/providers/apache/kafka/hooks/consume.py | 2 +-
airflow/providers/apache/kafka/hooks/produce.py | 4 +--
.../providers/apache/kafka/operators/produce.py | 2 +-
.../apache/kafka/triggers/await_message.py | 2 +-
airflow/providers/apache/kylin/hooks/kylin.py | 4 +--
.../providers/apache/kylin/operators/kylin_cube.py | 2 +-
airflow/providers/apache/livy/hooks/livy.py | 37 +++++++++++-----------
airflow/providers/apache/livy/triggers/livy.py | 4 +--
airflow/providers/apache/pig/hooks/pig.py | 4 +--
airflow/providers/apache/pinot/hooks/pinot.py | 12 +++----
airflow/providers/apache/spark/hooks/spark_jdbc.py | 2 +-
.../apache/spark/hooks/spark_jdbc_script.py | 6 ++--
airflow/providers/apache/spark/hooks/spark_sql.py | 4 +--
.../providers/apache/spark/hooks/spark_submit.py | 11 ++++---
.../providers/apache/spark/operators/spark_jdbc.py | 2 +-
.../providers/apache/spark/operators/spark_sql.py | 6 ++--
.../apache/spark/operators/spark_submit.py | 2 +-
airflow/providers/apache/sqoop/hooks/sqoop.py | 12 +++----
airflow/providers/apache/sqoop/operators/sqoop.py | 8 ++---
35 files changed, 112 insertions(+), 107 deletions(-)
diff --git a/airflow/providers/apache/beam/hooks/beam.py
b/airflow/providers/apache/beam/hooks/beam.py
index 9ea9e7b79b..264d9076e0 100644
--- a/airflow/providers/apache/beam/hooks/beam.py
+++ b/airflow/providers/apache/beam/hooks/beam.py
@@ -42,8 +42,8 @@ from airflow.utils.python_virtualenv import prepare_virtualenv
class BeamRunnerType:
"""
Helper class for listing runner types.
- For more information about runners see:
- https://beam.apache.org/documentation/
+
+ For more information about runners see:
https://beam.apache.org/documentation/
"""
DataflowRunner = "DataflowRunner"
@@ -58,7 +58,7 @@ class BeamRunnerType:
def beam_options_to_args(options: dict) -> list[str]:
"""
- Returns a formatted pipeline options from a dictionary of arguments
+ Returns a formatted pipeline options from a dictionary of arguments.
The logic of this method should be compatible with Apache Beam:
https://github.com/apache/beam/blob/b56740f0e8cd80c2873412847d0b336837429fb9/sdks/python/
diff --git a/airflow/providers/apache/cassandra/hooks/cassandra.py
b/airflow/providers/apache/cassandra/hooks/cassandra.py
index e058556c21..84f10c85b2 100644
--- a/airflow/providers/apache/cassandra/hooks/cassandra.py
+++ b/airflow/providers/apache/cassandra/hooks/cassandra.py
@@ -37,7 +37,7 @@ Policy = Union[DCAwareRoundRobinPolicy, RoundRobinPolicy,
TokenAwarePolicy, Whit
class CassandraHook(BaseHook, LoggingMixin):
"""
- Hook used to interact with Cassandra
+ Hook used to interact with Cassandra.
Contact points can be specified as a comma-separated string in the 'hosts'
field of the connection.
@@ -125,7 +125,7 @@ class CassandraHook(BaseHook, LoggingMixin):
self.session = None
def get_conn(self) -> Session:
- """Returns a cassandra Session object"""
+ """Returns a cassandra Session object."""
if self.session and not self.session.is_shutdown:
return self.session
self.session = self.cluster.connect(self.keyspace)
@@ -177,7 +177,7 @@ class CassandraHook(BaseHook, LoggingMixin):
def table_exists(self, table: str) -> bool:
"""
- Checks if a table exists in Cassandra
+ Checks if a table exists in Cassandra.
:param table: Target Cassandra table.
Use dot notation to target a specific keyspace.
@@ -190,7 +190,7 @@ class CassandraHook(BaseHook, LoggingMixin):
def record_exists(self, table: str, keys: dict[str, str]) -> bool:
"""
- Checks if a record exists in Cassandra
+ Checks if a record exists in Cassandra.
:param table: Target Cassandra table.
Use dot notation to target a specific keyspace.
diff --git a/airflow/providers/apache/drill/hooks/drill.py
b/airflow/providers/apache/drill/hooks/drill.py
index 5785cb0515..ab15ba6b66 100644
--- a/airflow/providers/apache/drill/hooks/drill.py
+++ b/airflow/providers/apache/drill/hooks/drill.py
@@ -64,7 +64,7 @@ class DrillHook(DbApiHook):
def get_uri(self) -> str:
"""
- Returns the connection URI
+ Returns the connection URI.
e.g: ``drill://localhost:8047/dfs``
"""
diff --git a/airflow/providers/apache/druid/hooks/druid.py
b/airflow/providers/apache/druid/hooks/druid.py
index abfd86d68e..5b5c814fb5 100644
--- a/airflow/providers/apache/druid/hooks/druid.py
+++ b/airflow/providers/apache/druid/hooks/druid.py
@@ -30,7 +30,7 @@ from airflow.providers.common.sql.hooks.sql import DbApiHook
class DruidHook(BaseHook):
"""
- Connection to Druid overlord for ingestion
+ Connection to Druid overlord for ingestion.
To connect to a Druid cluster that is secured with the druid-basic-security
extension, add the username and password to the druid ingestion connection.
@@ -60,7 +60,7 @@ class DruidHook(BaseHook):
raise ValueError("Druid timeout should be equal or greater than 1")
def get_conn_url(self) -> str:
- """Get Druid connection url"""
+ """Get Druid connection url."""
conn = self.get_connection(self.druid_ingest_conn_id)
host = conn.host
port = conn.port
@@ -83,7 +83,7 @@ class DruidHook(BaseHook):
return None
def submit_indexing_job(self, json_index_spec: dict[str, Any] | str) ->
None:
- """Submit Druid ingestion job"""
+ """Submit Druid ingestion job."""
url = self.get_conn_url()
self.log.info("Druid ingestion spec: %s", json_index_spec)
@@ -131,7 +131,7 @@ class DruidHook(BaseHook):
class DruidDbApiHook(DbApiHook):
"""
- Interact with Druid broker
+ Interact with Druid broker.
This hook is purely for users to query druid broker.
For ingestion, please use druidHook.
diff --git a/airflow/providers/apache/druid/operators/druid.py
b/airflow/providers/apache/druid/operators/druid.py
index 2ef39886da..7e1cd60799 100644
--- a/airflow/providers/apache/druid/operators/druid.py
+++ b/airflow/providers/apache/druid/operators/druid.py
@@ -28,7 +28,7 @@ if TYPE_CHECKING:
class DruidOperator(BaseOperator):
"""
- Allows to submit a task directly to druid
+ Allows to submit a task directly to druid.
:param json_index_file: The filepath to the druid index specification
:param druid_ingest_conn_id: The connection id of the Druid overlord which
diff --git a/airflow/providers/apache/druid/transfers/hive_to_druid.py
b/airflow/providers/apache/druid/transfers/hive_to_druid.py
index 4c7523dd8d..174046ccfe 100644
--- a/airflow/providers/apache/druid/transfers/hive_to_druid.py
+++ b/airflow/providers/apache/druid/transfers/hive_to_druid.py
@@ -35,7 +35,7 @@ class HiveToDruidOperator(BaseOperator):
"""
Moves data from Hive to Druid, [del]note that for now the data is loaded
into memory before being pushed to Druid, so this operator should
- be used for smallish amount of data.[/del]
+ be used for smallish amount of data.[/del].
:param sql: SQL query to execute against the Druid database. (templated)
:param druid_datasource: the datasource you want to ingest into in druid
diff --git a/airflow/providers/apache/flink/operators/flink_kubernetes.py
b/airflow/providers/apache/flink/operators/flink_kubernetes.py
index 245964a730..25f66d1a73 100644
--- a/airflow/providers/apache/flink/operators/flink_kubernetes.py
+++ b/airflow/providers/apache/flink/operators/flink_kubernetes.py
@@ -31,7 +31,7 @@ if TYPE_CHECKING:
class FlinkKubernetesOperator(BaseOperator):
"""
- Creates flinkDeployment object in kubernetes cluster:
+ Creates flinkDeployment object in kubernetes cluster.
.. seealso::
For more information on how to use this operator, take a look at the
guide:
diff --git a/airflow/providers/apache/flink/sensors/flink_kubernetes.py
b/airflow/providers/apache/flink/sensors/flink_kubernetes.py
index 3151a63ec5..a28a15cd4e 100644
--- a/airflow/providers/apache/flink/sensors/flink_kubernetes.py
+++ b/airflow/providers/apache/flink/sensors/flink_kubernetes.py
@@ -31,7 +31,7 @@ if TYPE_CHECKING:
class FlinkKubernetesSensor(BaseSensorOperator):
"""
- Checks flinkDeployment object in kubernetes cluster:
+ Checks flinkDeployment object in kubernetes cluster.
.. seealso::
For more detail about Flink Deployment Object have a look at the
reference:
diff --git a/airflow/providers/apache/hdfs/hooks/webhdfs.py
b/airflow/providers/apache/hdfs/hooks/webhdfs.py
index 67608c481c..f80b931f81 100644
--- a/airflow/providers/apache/hdfs/hooks/webhdfs.py
+++ b/airflow/providers/apache/hdfs/hooks/webhdfs.py
@@ -15,7 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""Hook for Web HDFS"""
+"""Hook for Web HDFS."""
from __future__ import annotations
import logging
@@ -41,7 +41,7 @@ if _kerberos_security_mode:
class AirflowWebHDFSHookException(AirflowException):
- """Exception specific for WebHDFS hook"""
+ """Exception specific for WebHDFS hook."""
class WebHDFSHook(BaseHook):
diff --git a/airflow/providers/apache/hdfs/sensors/web_hdfs.py
b/airflow/providers/apache/hdfs/sensors/web_hdfs.py
index 38e1047679..b091853254 100644
--- a/airflow/providers/apache/hdfs/sensors/web_hdfs.py
+++ b/airflow/providers/apache/hdfs/sensors/web_hdfs.py
@@ -26,7 +26,7 @@ if TYPE_CHECKING:
class WebHdfsSensor(BaseSensorOperator):
- """Waits for a file or folder to land in HDFS"""
+ """Waits for a file or folder to land in HDFS."""
template_fields: Sequence[str] = ("filepath",)
diff --git a/airflow/providers/apache/hive/hooks/hive.py
b/airflow/providers/apache/hive/hooks/hive.py
index 6aa03ff75a..89548fea8d 100644
--- a/airflow/providers/apache/hive/hooks/hive.py
+++ b/airflow/providers/apache/hive/hooks/hive.py
@@ -137,7 +137,7 @@ class HiveCliHook(BaseHook):
return proxy_user_value # The default proxy user (undefined)
def _prepare_cli_cmd(self) -> list[Any]:
- """This function creates the command list from available information"""
+ """This function creates the command list from available
information."""
conn = self.conn
hive_bin = "hive"
cmd_extra = []
@@ -296,7 +296,7 @@ class HiveCliHook(BaseHook):
return stdout
def test_hql(self, hql: str) -> None:
- """Test an hql statement using the hive cli and EXPLAIN"""
+ """Test an hql statement using the hive cli and EXPLAIN."""
create, insert, other = [], [], []
for query in hql.split(";"): # naive
query_original = query
@@ -415,7 +415,7 @@ class HiveCliHook(BaseHook):
tblproperties: dict[str, Any] | None = None,
) -> None:
"""
- Loads a local file into Hive
+ Loads a local file into Hive.
Note that the table generated in Hive uses ``STORED AS textfile``
which isn't the most efficient serialization format. If a
@@ -475,7 +475,7 @@ class HiveCliHook(BaseHook):
self.run_cli(hql)
def kill(self) -> None:
- """Kill Hive cli command"""
+ """Kill Hive cli command."""
if hasattr(self, "sub_process"):
if self.sub_process.poll() is None:
print("Killing the Hive job")
@@ -486,7 +486,7 @@ class HiveCliHook(BaseHook):
class HiveMetastoreHook(BaseHook):
"""
- Wrapper to interact with the Hive Metastore
+ Wrapper to interact with the Hive Metastore.
:param metastore_conn_id: reference to the
:ref: `metastore thrift service connection id
<howto/connection:hive_metastore>`.
@@ -587,7 +587,7 @@ class HiveMetastoreHook(BaseHook):
def check_for_partition(self, schema: str, table: str, partition: str) ->
bool:
"""
- Checks whether a partition exists
+ Checks whether a partition exists.
:param schema: Name of hive schema (database) @table belongs to
:param table: Name of hive table @partition belongs to
@@ -608,7 +608,7 @@ class HiveMetastoreHook(BaseHook):
def check_for_named_partition(self, schema: str, table: str,
partition_name: str) -> Any:
"""
- Checks whether a partition with a given name exists
+ Checks whether a partition with a given name exists.
:param schema: Name of hive schema (database) @table belongs to
:param table: Name of hive table @partition belongs to
@@ -625,7 +625,7 @@ class HiveMetastoreHook(BaseHook):
return client.check_for_named_partition(schema, table,
partition_name)
def get_table(self, table_name: str, db: str = "default") -> Any:
- """Get a metastore table object
+ """Get a metastore table object.
>>> hh = HiveMetastoreHook()
>>> t = hh.get_table(db='airflow', table_name='static_babynames')
@@ -640,13 +640,13 @@ class HiveMetastoreHook(BaseHook):
return client.get_table(dbname=db, tbl_name=table_name)
def get_tables(self, db: str, pattern: str = "*") -> Any:
- """Get a metastore table object"""
+ """Get a metastore table object."""
with self.metastore as client:
tables = client.get_tables(db_name=db, pattern=pattern)
return client.get_table_objects_by_name(db, tables)
def get_databases(self, pattern: str = "*") -> Any:
- """Get a metastore table object"""
+ """Get a metastore table object."""
with self.metastore as client:
return client.get_databases(pattern)
@@ -774,7 +774,7 @@ class HiveMetastoreHook(BaseHook):
def table_exists(self, table_name: str, db: str = "default") -> bool:
"""
- Check if table exists
+ Check if table exists.
>>> hh = HiveMetastoreHook()
>>> hh.table_exists(db='airflow', table_name='static_babynames')
@@ -790,7 +790,7 @@ class HiveMetastoreHook(BaseHook):
def drop_partitions(self, table_name, part_vals, delete_data=False,
db="default"):
"""
- Drop partitions from the given table matching the part_vals input
+ Drop partitions from the given table matching the part_vals input.
:param table_name: table name.
:param part_vals: list of partition specs.
@@ -816,7 +816,7 @@ class HiveMetastoreHook(BaseHook):
class HiveServer2Hook(DbApiHook):
"""
- Wrapper around the pyhive library
+ Wrapper around the pyhive library.
Notes:
* the default auth_mechanism is PLAIN, to override it you
@@ -1037,7 +1037,7 @@ class HiveServer2Hook(DbApiHook):
**kwargs,
) -> pandas.DataFrame:
"""
- Get a pandas dataframe from a Hive query
+ Get a pandas dataframe from a Hive query.
:param sql: hql to be executed.
:param schema: target schema, default to 'default'.
diff --git a/airflow/providers/apache/hive/operators/hive.py
b/airflow/providers/apache/hive/operators/hive.py
index d76aa8e880..71bd8ac49b 100644
--- a/airflow/providers/apache/hive/operators/hive.py
+++ b/airflow/providers/apache/hive/operators/hive.py
@@ -123,7 +123,7 @@ class HiveOperator(BaseOperator):
self.hook: HiveCliHook | None = None
def get_hook(self) -> HiveCliHook:
- """Get Hive cli hook"""
+ """Get Hive cli hook."""
return HiveCliHook(
hive_cli_conn_id=self.hive_cli_conn_id,
run_as=self.run_as,
diff --git a/airflow/providers/apache/hive/operators/hive_stats.py
b/airflow/providers/apache/hive/operators/hive_stats.py
index caa6770a8f..16b9c61200 100644
--- a/airflow/providers/apache/hive/operators/hive_stats.py
+++ b/airflow/providers/apache/hive/operators/hive_stats.py
@@ -36,7 +36,9 @@ class HiveStatsCollectionOperator(BaseOperator):
"""
Gathers partition statistics using a dynamically generated Presto
query, inserts the stats into a MySql table with this format. Stats
- overwrite themselves if you rerun the same date/partition. ::
+ overwrite themselves if you rerun the same date/partition.
+
+ ::
CREATE TABLE hive_stats (
ds VARCHAR(16),
@@ -98,7 +100,7 @@ class HiveStatsCollectionOperator(BaseOperator):
self.dttm = "{{ execution_date.isoformat() }}"
def get_default_exprs(self, col: str, col_type: str) -> dict[Any, Any]:
- """Get default expressions"""
+ """Get default expressions."""
if col in self.excluded_columns:
return {}
exp = {(col, "non_null"): f"COUNT({col})"}
diff --git a/airflow/providers/apache/hive/transfers/vertica_to_hive.py
b/airflow/providers/apache/hive/transfers/vertica_to_hive.py
index e0b41f2eb6..ce6233052d 100644
--- a/airflow/providers/apache/hive/transfers/vertica_to_hive.py
+++ b/airflow/providers/apache/hive/transfers/vertica_to_hive.py
@@ -96,8 +96,9 @@ class VerticaToHiveOperator(BaseOperator):
def type_map(cls, vertica_type):
"""
Vertica-python datatype.py does not provide the full type mapping
access.
- Manual hack. Reference:
-
https://github.com/uber/vertica-python/blob/master/vertica_python/vertica/column.py
+ Manual hack.
+
+ Reference:
https://github.com/uber/vertica-python/blob/master/vertica_python/vertica/column.py
"""
type_map = {
5: "BOOLEAN",
diff --git a/airflow/providers/apache/kafka/hooks/base.py
b/airflow/providers/apache/kafka/hooks/base.py
index 777beb116e..cc6f739ab9 100644
--- a/airflow/providers/apache/kafka/hooks/base.py
+++ b/airflow/providers/apache/kafka/hooks/base.py
@@ -26,7 +26,7 @@ from airflow.hooks.base import BaseHook
class KafkaBaseHook(BaseHook):
"""
- A base hook for interacting with Apache Kafka
+ A base hook for interacting with Apache Kafka.
:param kafka_config_id: The connection object to use, defaults to
"kafka_default"
"""
@@ -37,14 +37,14 @@ class KafkaBaseHook(BaseHook):
hook_name = "Apache Kafka"
def __init__(self, kafka_config_id=default_conn_name, *args, **kwargs):
- """Initialize our Base"""
+ """Initialize our Base."""
super().__init__()
self.kafka_config_id = kafka_config_id
self.get_conn
@staticmethod
def get_ui_field_behaviour() -> dict[str, Any]:
- """Returns custom field behaviour"""
+ """Returns custom field behaviour."""
return {
"hidden_fields": ["schema", "login", "password", "port", "host"],
"relabeling": {"extra": "Config Dict"},
@@ -58,7 +58,7 @@ class KafkaBaseHook(BaseHook):
@cached_property
def get_conn(self) -> Any:
- """Get the configuration object"""
+ """Get the configuration object."""
config = self.get_connection(self.kafka_config_id).extra_dejson
if not (config.get("bootstrap.servers", None)):
@@ -67,7 +67,7 @@ class KafkaBaseHook(BaseHook):
return self._get_client(config)
def test_connection(self) -> tuple[bool, str]:
- """Test Connectivity from the UI"""
+ """Test Connectivity from the UI."""
try:
config = self.get_connection(self.kafka_config_id).extra_dejson
t = AdminClient(config, timeout=10).list_topics()
diff --git a/airflow/providers/apache/kafka/hooks/client.py
b/airflow/providers/apache/kafka/hooks/client.py
index 1043a3edb5..b8379c983e 100644
--- a/airflow/providers/apache/kafka/hooks/client.py
+++ b/airflow/providers/apache/kafka/hooks/client.py
@@ -26,7 +26,7 @@ from airflow.providers.apache.kafka.hooks.base import
KafkaBaseHook
class KafkaAdminClientHook(KafkaBaseHook):
"""
- A hook for interacting with the Kafka Cluster
+ A hook for interacting with the Kafka Cluster.
:param kafka_config_id: The connection object to use, defaults to
"kafka_default"
"""
@@ -41,7 +41,7 @@ class KafkaAdminClientHook(KafkaBaseHook):
self,
topics: Sequence[Sequence[Any]],
) -> None:
- """Creates a topic
+ """Creates a topic.
:param topics: a list of topics to create including the number of
partitions for the topic
and the replication factor. Format: [ ("topic_name", number of
partitions, replication factor)]
diff --git a/airflow/providers/apache/kafka/hooks/consume.py
b/airflow/providers/apache/kafka/hooks/consume.py
index 9ab0361067..d64859ffb1 100644
--- a/airflow/providers/apache/kafka/hooks/consume.py
+++ b/airflow/providers/apache/kafka/hooks/consume.py
@@ -25,7 +25,7 @@ from airflow.providers.apache.kafka.hooks.base import
KafkaBaseHook
class KafkaConsumerHook(KafkaBaseHook):
"""
- A hook for creating a Kafka Consumer
+ A hook for creating a Kafka Consumer.
:param kafka_config_id: The connection object to use, defaults to
"kafka_default"
:param topics: A list of topics to subscribe to.
diff --git a/airflow/providers/apache/kafka/hooks/produce.py
b/airflow/providers/apache/kafka/hooks/produce.py
index 7e3a5bcf6e..79d648b5e1 100644
--- a/airflow/providers/apache/kafka/hooks/produce.py
+++ b/airflow/providers/apache/kafka/hooks/produce.py
@@ -23,7 +23,7 @@ from airflow.providers.apache.kafka.hooks.base import
KafkaBaseHook
class KafkaProducerHook(KafkaBaseHook):
"""
- A hook for creating a Kafka Producer
+ A hook for creating a Kafka Producer.
:param kafka_config_id: The connection object to use, defaults to
"kafka_default"
"""
@@ -35,7 +35,7 @@ class KafkaProducerHook(KafkaBaseHook):
return Producer(config)
def get_producer(self) -> Producer:
- """Returns a producer object for sending messages to Kafka"""
+ """Returns a producer object for sending messages to Kafka."""
producer = self.get_conn
self.log.info("Producer %s", producer)
diff --git a/airflow/providers/apache/kafka/operators/produce.py
b/airflow/providers/apache/kafka/operators/produce.py
index c6a05436aa..30bf5e5450 100644
--- a/airflow/providers/apache/kafka/operators/produce.py
+++ b/airflow/providers/apache/kafka/operators/produce.py
@@ -38,7 +38,7 @@ def acked(err, msg):
class ProduceToTopicOperator(BaseOperator):
- """An operator that produces messages to a Kafka topic
+ """An operator that produces messages to a Kafka topic.
Registers a producer to a kafka topic and publishes messages to the log.
diff --git a/airflow/providers/apache/kafka/triggers/await_message.py
b/airflow/providers/apache/kafka/triggers/await_message.py
index 7e7021f54b..5445579e12 100644
--- a/airflow/providers/apache/kafka/triggers/await_message.py
+++ b/airflow/providers/apache/kafka/triggers/await_message.py
@@ -29,7 +29,7 @@ from airflow.utils.module_loading import import_string
class AwaitMessageTrigger(BaseTrigger):
- """A trigger that waits for a message matching specific criteria to arrive
in Kafka
+ """A trigger that waits for a message matching specific criteria to arrive
in Kafka.
The behavior of the consumer of this trigger is as follows:
- poll the Kafka topics for a message, if no message returned, sleep
diff --git a/airflow/providers/apache/kylin/hooks/kylin.py
b/airflow/providers/apache/kylin/hooks/kylin.py
index 709d4e7a28..a8865bcaf1 100644
--- a/airflow/providers/apache/kylin/hooks/kylin.py
+++ b/airflow/providers/apache/kylin/hooks/kylin.py
@@ -59,7 +59,7 @@ class KylinHook(BaseHook):
def cube_run(self, datasource_name, op, **op_args):
"""
- Run CubeSource command which in CubeSource.support_invoke_command
+ Run CubeSource command which in CubeSource.support_invoke_command.
:param datasource_name:
:param op: command
@@ -74,7 +74,7 @@ class KylinHook(BaseHook):
def get_job_status(self, job_id):
"""
- Get job status
+ Get job status.
:param job_id: kylin job id
:return: job status
diff --git a/airflow/providers/apache/kylin/operators/kylin_cube.py
b/airflow/providers/apache/kylin/operators/kylin_cube.py
index 3f873cbaf5..7ca807d108 100644
--- a/airflow/providers/apache/kylin/operators/kylin_cube.py
+++ b/airflow/providers/apache/kylin/operators/kylin_cube.py
@@ -34,7 +34,7 @@ if TYPE_CHECKING:
class KylinCubeOperator(BaseOperator):
"""
This operator is used to submit request about kylin build/refresh/merge,
- and can track job status . so users can easier to build kylin job
+ and can track job status . so users can easier to build kylin job.
For more detail information in
`Apache Kylin <http://kylin.apache.org/>`_
diff --git a/airflow/providers/apache/livy/hooks/livy.py
b/airflow/providers/apache/livy/hooks/livy.py
index 44d9cc8915..913388afe9 100644
--- a/airflow/providers/apache/livy/hooks/livy.py
+++ b/airflow/providers/apache/livy/hooks/livy.py
@@ -35,7 +35,7 @@ from airflow.utils.log.logging_mixin import LoggingMixin
class BatchState(Enum):
- """Batch session states"""
+ """Batch session states."""
NOT_STARTED = "not_started"
STARTING = "starting"
@@ -92,7 +92,7 @@ class LivyHook(HttpHook, LoggingMixin):
def get_conn(self, headers: dict[str, Any] | None = None) -> Any:
"""
- Returns http session for use with requests
+ Returns http session for use with requests.
:param headers: additional headers to be passed through as a dictionary
:return: requests session
@@ -111,7 +111,7 @@ class LivyHook(HttpHook, LoggingMixin):
retry_args: dict[str, Any] | None = None,
) -> Any:
"""
- Wrapper for HttpHook, allows to change method on the same HttpHook
+ Wrapper for HttpHook, allows to change method on the same HttpHook.
:param method: http method
:param endpoint: endpoint
@@ -146,7 +146,7 @@ class LivyHook(HttpHook, LoggingMixin):
def post_batch(self, *args: Any, **kwargs: Any) -> int:
"""
- Perform request to submit batch
+ Perform request to submit batch.
:return: batch session id
"""
@@ -179,7 +179,7 @@ class LivyHook(HttpHook, LoggingMixin):
def get_batch(self, session_id: int | str) -> dict:
"""
- Fetch info about the specified batch
+ Fetch info about the specified batch.
:param session_id: identifier of the batch sessions
:return: response body
@@ -201,7 +201,7 @@ class LivyHook(HttpHook, LoggingMixin):
def get_batch_state(self, session_id: int | str, retry_args: dict[str,
Any] | None = None) -> BatchState:
"""
- Fetch the state of the specified batch
+ Fetch the state of the specified batch.
:param session_id: identifier of the batch sessions
:param retry_args: Arguments which define the retry behaviour.
@@ -230,7 +230,7 @@ class LivyHook(HttpHook, LoggingMixin):
def delete_batch(self, session_id: int | str) -> dict:
"""
- Delete the specified batch
+ Delete the specified batch.
:param session_id: identifier of the batch sessions
:return: response body
@@ -255,6 +255,7 @@ class LivyHook(HttpHook, LoggingMixin):
def get_batch_logs(self, session_id: int | str, log_start_position,
log_batch_size) -> dict:
"""
Gets the session logs for a specified batch.
+
:param session_id: identifier of the batch sessions
:param log_start_position: Position from where to pull the logs
:param log_batch_size: Number of lines to pull in one batch
@@ -278,7 +279,7 @@ class LivyHook(HttpHook, LoggingMixin):
def dump_batch_logs(self, session_id: int | str) -> None:
"""
- Dumps the session logs for a specified batch
+ Dumps the session logs for a specified batch.
:param session_id: identifier of the batch sessions
:return: response body
@@ -300,7 +301,7 @@ class LivyHook(HttpHook, LoggingMixin):
@staticmethod
def _validate_session_id(session_id: int | str) -> None:
"""
- Validate session id is a int
+ Validate session id is a int.
:param session_id: session id
"""
@@ -312,7 +313,7 @@ class LivyHook(HttpHook, LoggingMixin):
@staticmethod
def _parse_post_response(response: dict[Any, Any]) -> int | None:
"""
- Parse batch response for batch id
+ Parse batch response for batch id.
:param response: response body
:return: session id
@@ -322,7 +323,7 @@ class LivyHook(HttpHook, LoggingMixin):
@staticmethod
def _parse_request_response(response: dict[Any, Any], parameter):
"""
- Parse batch response for batch id
+ Parse batch response for batch id.
:param response: response body
:return: value of parameter
@@ -454,7 +455,7 @@ class LivyHook(HttpHook, LoggingMixin):
class LivyAsyncHook(HttpAsyncHook, LoggingMixin):
"""
- Hook for Apache Livy through the REST API asynchronously
+ Hook for Apache Livy through the REST API asynchronously.
:param livy_conn_id: reference to a pre-defined Livy Connection.
:param extra_options: A dictionary of options passed to Livy.
@@ -497,7 +498,7 @@ class LivyAsyncHook(HttpAsyncHook, LoggingMixin):
extra_options: dict[str, Any] | None = None,
) -> Any:
"""
- Performs an asynchronous HTTP request call
+ Performs an asynchronous HTTP request call.
:param endpoint: the endpoint to be called i.e. resource/v1/query?
:param data: payload to be uploaded or request parameters
@@ -590,7 +591,7 @@ class LivyAsyncHook(HttpAsyncHook, LoggingMixin):
headers: dict[str, Any] | None = None,
) -> Any:
"""
- Wrapper for HttpAsyncHook, allows to change method on the same
HttpAsyncHook
+ Wrapper for HttpAsyncHook, allows to change method on the same
HttpAsyncHook.
:param method: http method
:param endpoint: endpoint
@@ -661,7 +662,7 @@ class LivyAsyncHook(HttpAsyncHook, LoggingMixin):
async def dump_batch_logs(self, session_id: int | str) -> Any:
"""
- Dumps the session logs for a specified batch asynchronously
+ Dumps the session logs for a specified batch asynchronously.
:param session_id: identifier of the batch sessions
:return: response body
@@ -687,7 +688,7 @@ class LivyAsyncHook(HttpAsyncHook, LoggingMixin):
@staticmethod
def _validate_session_id(session_id: int | str) -> None:
"""
- Validate session id is a int
+ Validate session id is a int.
:param session_id: session id
"""
@@ -699,7 +700,7 @@ class LivyAsyncHook(HttpAsyncHook, LoggingMixin):
@staticmethod
def _parse_post_response(response: dict[Any, Any]) -> Any:
"""
- Parse batch response for batch id
+ Parse batch response for batch id.
:param response: response body
:return: session id
@@ -709,7 +710,7 @@ class LivyAsyncHook(HttpAsyncHook, LoggingMixin):
@staticmethod
def _parse_request_response(response: dict[Any, Any], parameter: Any) ->
Any:
"""
- Parse batch response for batch id
+ Parse batch response for batch id.
:param response: response body
:return: value of parameter
diff --git a/airflow/providers/apache/livy/triggers/livy.py
b/airflow/providers/apache/livy/triggers/livy.py
index 17c5567679..71e4d53b20 100644
--- a/airflow/providers/apache/livy/triggers/livy.py
+++ b/airflow/providers/apache/livy/triggers/livy.py
@@ -27,7 +27,7 @@ from airflow.triggers.base import BaseTrigger, TriggerEvent
class LivyTrigger(BaseTrigger):
"""
- Check for the state of a previously submitted job with batch_id
+ Check for the state of a previously submitted job with batch_id.
:param batch_id: Batch job id
:param spark_params: Spark parameters; for example,
@@ -82,7 +82,7 @@ class LivyTrigger(BaseTrigger):
"""
Checks if the _polling_interval > 0, in that case it pools Livy for
batch termination asynchronously.
- else returns the success response
+ else returns the success response.
"""
try:
if self._polling_interval > 0:
diff --git a/airflow/providers/apache/pig/hooks/pig.py
b/airflow/providers/apache/pig/hooks/pig.py
index 023b308e13..71c39536d3 100644
--- a/airflow/providers/apache/pig/hooks/pig.py
+++ b/airflow/providers/apache/pig/hooks/pig.py
@@ -57,7 +57,7 @@ class PigCliHook(BaseHook):
def run_cli(self, pig: str, pig_opts: str | None = None, verbose: bool =
True) -> Any:
"""
- Run a pig script using the pig cli
+ Run a pig script using the pig cli.
>>> ph = PigCliHook()
>>> result = ph.run_cli("ls /;", pig_opts="-x mapreduce")
@@ -101,7 +101,7 @@ class PigCliHook(BaseHook):
return stdout
def kill(self) -> None:
- """Kill Pig job"""
+ """Kill Pig job."""
if self.sub_process:
if self.sub_process.poll() is None:
self.log.info("Killing the Pig job")
diff --git a/airflow/providers/apache/pinot/hooks/pinot.py
b/airflow/providers/apache/pinot/hooks/pinot.py
index 909b0182dd..caf0709671 100644
--- a/airflow/providers/apache/pinot/hooks/pinot.py
+++ b/airflow/providers/apache/pinot/hooks/pinot.py
@@ -84,7 +84,7 @@ class PinotAdminHook(BaseHook):
def add_schema(self, schema_file: str, with_exec: bool = True) -> Any:
"""
- Add Pinot schema by run AddSchema command
+ Add Pinot schema by run AddSchema command.
:param schema_file: Pinot schema file
:param with_exec: bool
@@ -99,7 +99,7 @@ class PinotAdminHook(BaseHook):
def add_table(self, file_path: str, with_exec: bool = True) -> Any:
"""
- Add Pinot table with run AddTable command
+ Add Pinot table with run AddTable command.
:param file_path: Pinot table configure file
:param with_exec: bool
@@ -133,7 +133,7 @@ class PinotAdminHook(BaseHook):
post_creation_verification: str | None = None,
retry: str | None = None,
) -> Any:
- """Create Pinot segment by run CreateSegment command"""
+ """Create Pinot segment by run CreateSegment command."""
cmd = ["CreateSegment"]
if generator_config_file:
@@ -194,7 +194,7 @@ class PinotAdminHook(BaseHook):
def upload_segment(self, segment_dir: str, table_name: str | None = None)
-> Any:
"""
- Upload Segment with run UploadSegment command
+ Upload Segment with run UploadSegment command.
:param segment_dir:
:param table_name:
@@ -210,7 +210,7 @@ class PinotAdminHook(BaseHook):
def run_cli(self, cmd: list[str], verbose: bool = True) -> str:
"""
- Run command with pinot-admin.sh
+ Run command with pinot-admin.sh.
:param cmd: List of command going to be run by pinot-admin.sh script
:param verbose:
@@ -249,7 +249,7 @@ class PinotAdminHook(BaseHook):
class PinotDbApiHook(DbApiHook):
"""
- Interact with Pinot Broker Query API
+ Interact with Pinot Broker Query API.
This hook uses standard-SQL endpoint since PQL endpoint is soon to be
deprecated.
https://docs.pinot.apache.org/users/api/querying-pinot-using-standard-sql
diff --git a/airflow/providers/apache/spark/hooks/spark_jdbc.py
b/airflow/providers/apache/spark/hooks/spark_jdbc.py
index c568f77bf5..60ed0ebe57 100644
--- a/airflow/providers/apache/spark/hooks/spark_jdbc.py
+++ b/airflow/providers/apache/spark/hooks/spark_jdbc.py
@@ -223,7 +223,7 @@ class SparkJDBCHook(SparkSubmitHook):
return arguments
def submit_jdbc_job(self) -> None:
- """Submit Spark JDBC job"""
+ """Submit Spark JDBC job."""
self._application_args =
self._build_jdbc_application_arguments(self._jdbc_connection)
self.submit(application=f"{os.path.dirname(os.path.abspath(__file__))}/spark_jdbc_script.py")
diff --git a/airflow/providers/apache/spark/hooks/spark_jdbc_script.py
b/airflow/providers/apache/spark/hooks/spark_jdbc_script.py
index 2cc10584d7..d431782929 100644
--- a/airflow/providers/apache/spark/hooks/spark_jdbc_script.py
+++ b/airflow/providers/apache/spark/hooks/spark_jdbc_script.py
@@ -35,7 +35,7 @@ def set_common_options(
driver: str = "driver",
) -> Any:
"""
- Get Spark source from JDBC connection
+ Get Spark source from JDBC connection.
:param spark_source: Spark source, here is Spark reader or writer
:param url: JDBC resource url
@@ -69,7 +69,7 @@ def spark_write_to_jdbc(
num_partitions: int,
create_table_column_types: str,
) -> None:
- """Transfer data from Spark to JDBC source"""
+ """Transfer data from Spark to JDBC source."""
writer = spark_session.table(metastore_table).write
# first set common options
writer = set_common_options(writer, url, jdbc_table, user, password,
driver)
@@ -103,7 +103,7 @@ def spark_read_from_jdbc(
lower_bound: str,
upper_bound: str,
) -> None:
- """Transfer data from JDBC source to Spark"""
+ """Transfer data from JDBC source to Spark."""
# first set common options
reader = set_common_options(spark_session.read, url, jdbc_table, user,
password, driver)
diff --git a/airflow/providers/apache/spark/hooks/spark_sql.py
b/airflow/providers/apache/spark/hooks/spark_sql.py
index d6f8f56c27..685c3330d1 100644
--- a/airflow/providers/apache/spark/hooks/spark_sql.py
+++ b/airflow/providers/apache/spark/hooks/spark_sql.py
@@ -162,7 +162,7 @@ class SparkSqlHook(BaseHook):
def run_query(self, cmd: str = "", **kwargs: Any) -> None:
"""
- Remote Popen (actually execute the Spark-sql query)
+ Remote Popen (actually execute the Spark-sql query).
:param cmd: command to append to the spark-sql command
:param kwargs: extra arguments to Popen (see subprocess.Popen)
@@ -185,7 +185,7 @@ class SparkSqlHook(BaseHook):
)
def kill(self) -> None:
- """Kill Spark job"""
+ """Kill Spark job."""
if self._sp and self._sp.poll() is None:
self.log.info("Killing the Spark-Sql job")
self._sp.kill()
diff --git a/airflow/providers/apache/spark/hooks/spark_submit.py
b/airflow/providers/apache/spark/hooks/spark_submit.py
index 842df0e28a..e3fcce7398 100644
--- a/airflow/providers/apache/spark/hooks/spark_submit.py
+++ b/airflow/providers/apache/spark/hooks/spark_submit.py
@@ -88,7 +88,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
@staticmethod
def get_ui_field_behaviour() -> dict[str, Any]:
- """Returns custom field behaviour"""
+ """Returns custom field behaviour."""
return {
"hidden_fields": ["schema", "login", "password"],
"relabeling": {},
@@ -169,7 +169,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
"""
Determines whether this hook should poll the spark driver status
through
subsequent spark-submit status requests after the initial spark-submit
request
- :return: if the driver status should be tracked
+ :return: if the driver status should be tracked.
"""
return "spark://" in self._connection["master"] and
self._connection["deploy_mode"] == "cluster"
@@ -385,7 +385,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
def submit(self, application: str = "", **kwargs: Any) -> None:
"""
- Remote Popen to execute the spark-submit job
+ Remote Popen to execute the spark-submit job.
:param application: Submitted application, jar or py file
:param kwargs: extra arguments to Popen (see subprocess.Popen)
@@ -491,7 +491,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
def _process_spark_status_log(self, itr: Iterator[Any]) -> None:
"""
- Parses the logs of the spark driver status query process
+ Parses the logs of the spark driver status query process.
:param itr: An iterator which iterates over the input of the subprocess
"""
@@ -582,6 +582,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
def _build_spark_driver_kill_command(self) -> list[str]:
"""
Construct the spark-submit command to kill a driver.
+
:return: full command to kill a driver
"""
# Assume that spark-submit is present in the path to the executing user
@@ -599,7 +600,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
return connection_cmd
def on_kill(self) -> None:
- """Kill Spark submit command"""
+ """Kill Spark submit command."""
self.log.debug("Kill Command is being called")
if self._should_track_driver_status and self._driver_id:
diff --git a/airflow/providers/apache/spark/operators/spark_jdbc.py
b/airflow/providers/apache/spark/operators/spark_jdbc.py
index 7dd035db40..29312d8a7c 100644
--- a/airflow/providers/apache/spark/operators/spark_jdbc.py
+++ b/airflow/providers/apache/spark/operators/spark_jdbc.py
@@ -159,7 +159,7 @@ class SparkJDBCOperator(SparkSubmitOperator):
self._hook: SparkJDBCHook | None = None
def execute(self, context: Context) -> None:
- """Call the SparkSubmitHook to run the provided spark job"""
+ """Call the SparkSubmitHook to run the provided spark job."""
if self._hook is None:
self._hook = self._get_hook()
self._hook.submit_jdbc_job()
diff --git a/airflow/providers/apache/spark/operators/spark_sql.py
b/airflow/providers/apache/spark/operators/spark_sql.py
index a5150d281a..47487fa384 100644
--- a/airflow/providers/apache/spark/operators/spark_sql.py
+++ b/airflow/providers/apache/spark/operators/spark_sql.py
@@ -28,7 +28,7 @@ if TYPE_CHECKING:
class SparkSqlOperator(BaseOperator):
"""
- Execute Spark SQL query
+ Execute Spark SQL query.
.. seealso::
For more information on how to use this operator, take a look at the
guide:
@@ -91,7 +91,7 @@ class SparkSqlOperator(BaseOperator):
self._hook: SparkSqlHook | None = None
def execute(self, context: Context) -> None:
- """Call the SparkSqlHook to run the provided sql query"""
+ """Call the SparkSqlHook to run the provided sql query."""
if self._hook is None:
self._hook = self._get_hook()
self._hook.run_query()
@@ -102,7 +102,7 @@ class SparkSqlOperator(BaseOperator):
self._hook.kill()
def _get_hook(self) -> SparkSqlHook:
- """Get SparkSqlHook"""
+ """Get SparkSqlHook."""
return SparkSqlHook(
sql=self._sql,
conf=self._conf,
diff --git a/airflow/providers/apache/spark/operators/spark_submit.py
b/airflow/providers/apache/spark/operators/spark_submit.py
index f7ca92815a..e335f056b1 100644
--- a/airflow/providers/apache/spark/operators/spark_submit.py
+++ b/airflow/providers/apache/spark/operators/spark_submit.py
@@ -151,7 +151,7 @@ class SparkSubmitOperator(BaseOperator):
self._conn_id = conn_id
def execute(self, context: Context) -> None:
- """Call the SparkSubmitHook to run the provided spark job"""
+ """Call the SparkSubmitHook to run the provided spark job."""
if self._hook is None:
self._hook = self._get_hook()
self._hook.submit(self._application)
diff --git a/airflow/providers/apache/sqoop/hooks/sqoop.py
b/airflow/providers/apache/sqoop/hooks/sqoop.py
index fafabca09d..a38a58b48f 100644
--- a/airflow/providers/apache/sqoop/hooks/sqoop.py
+++ b/airflow/providers/apache/sqoop/hooks/sqoop.py
@@ -15,7 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""This module contains a sqoop 1.x hook"""
+"""This module contains a sqoop 1.x hook."""
from __future__ import annotations
import subprocess
@@ -85,7 +85,7 @@ class SqoopHook(BaseHook):
return self.conn
def cmd_mask_password(self, cmd_orig: list[str]) -> list[str]:
- """Mask command password for safety"""
+ """Mask command password for safety."""
cmd = deepcopy(cmd_orig)
try:
password_index = cmd.index("--password")
@@ -96,7 +96,7 @@ class SqoopHook(BaseHook):
def popen(self, cmd: list[str], **kwargs: Any) -> None:
"""
- Remote Popen
+ Remote Popen.
:param cmd: command to remotely execute
:param kwargs: extra arguments to Popen (see subprocess.Popen)
@@ -225,7 +225,7 @@ class SqoopHook(BaseHook):
) -> Any:
"""
Imports table from remote location to target dir. Arguments are
- copies of direct sqoop command line arguments
+ copies of direct sqoop command line arguments.
:param table: Table to read
:param schema: Schema name
@@ -267,7 +267,7 @@ class SqoopHook(BaseHook):
extra_import_options: dict[str, Any] | None = None,
) -> Any:
"""
- Imports a specific query from the rdbms to hdfs
+ Imports a specific query from the rdbms to hdfs.
:param query: Free format query to run
:param target_dir: HDFS destination dir
@@ -377,7 +377,7 @@ class SqoopHook(BaseHook):
) -> None:
"""
Exports Hive table to remote location. Arguments are copies of direct
- sqoop command line Arguments
+ sqoop command line Arguments.
:param table: Table remote destination
:param schema: Schema name
diff --git a/airflow/providers/apache/sqoop/operators/sqoop.py
b/airflow/providers/apache/sqoop/operators/sqoop.py
index 74def0ff18..c421a53dd4 100644
--- a/airflow/providers/apache/sqoop/operators/sqoop.py
+++ b/airflow/providers/apache/sqoop/operators/sqoop.py
@@ -15,7 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""This module contains a sqoop 1 operator"""
+"""This module contains a sqoop 1 operator."""
from __future__ import annotations
import os
@@ -33,8 +33,8 @@ if TYPE_CHECKING:
class SqoopOperator(BaseOperator):
"""
Execute a Sqoop job.
- Documentation for Apache Sqoop can be found here:
- https://sqoop.apache.org/docs/1.4.2/SqoopUserGuide.html
+
+ Documentation for Apache Sqoop can be found here:
https://sqoop.apache.org/docs/1.4.2/SqoopUserGuide.html
:param conn_id: str
:param cmd_type: str specify command to execute "export" or "import"
@@ -192,7 +192,7 @@ class SqoopOperator(BaseOperator):
self.libjars = libjars
def execute(self, context: Context) -> None:
- """Execute sqoop job"""
+ """Execute sqoop job."""
if self.hook is None:
self.hook = self._get_hook()