This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new f3e87c5 Add D202 pydocstyle check (#11032)
f3e87c5 is described below
commit f3e87c503081a3085dff6c7352640d7f08beb5bc
Author: Patrick Cando <[email protected]>
AuthorDate: Tue Sep 22 16:17:24 2020 +0100
Add D202 pydocstyle check (#11032)
---
.pre-commit-config.yaml | 2 +-
airflow/api/auth/backend/kerberos_auth.py | 1 -
.../api/common/experimental/get_dag_run_state.py | 1 -
.../api_connexion/endpoints/dag_run_endpoint.py | 1 -
.../api_connexion/endpoints/event_log_endpoint.py | 1 -
.../endpoints/import_error_endpoint.py | 1 -
airflow/api_connexion/endpoints/pool_endpoint.py | 1 -
airflow/api_connexion/endpoints/xcom_endpoint.py | 1 -
airflow/api_connexion/schemas/common_schema.py | 2 --
airflow/api_connexion/schemas/dag_schema.py | 1 -
airflow/cli/commands/dag_command.py | 1 -
airflow/cli/commands/task_command.py | 2 --
airflow/configuration.py | 1 -
airflow/executors/celery_executor.py | 2 --
airflow/hooks/dbapi_hook.py | 2 --
airflow/jobs/backfill_job.py | 1 -
airflow/jobs/local_task_job.py | 1 -
airflow/jobs/scheduler_job.py | 1 -
airflow/kubernetes/kube_client.py | 1 -
airflow/kubernetes/pod_generator.py | 1 -
airflow/macros/__init__.py | 1 -
...a59344a4_make_taskinstance_pool_not_nullable.py | 1 -
.../8f966b9c467a_set_conn_type_as_non_nullable.py | 1 -
airflow/models/baseoperator.py | 4 ---
airflow/models/dag.py | 2 --
airflow/models/dagbag.py | 1 -
airflow/models/dagrun.py | 2 --
airflow/models/taskinstance.py | 5 ----
airflow/models/variable.py | 1 -
airflow/operators/sql.py | 1 -
.../providers/amazon/aws/hooks/cloud_formation.py | 3 ---
airflow/providers/amazon/aws/hooks/emr.py | 2 --
airflow/providers/amazon/aws/hooks/glue_catalog.py | 2 --
airflow/providers/amazon/aws/hooks/kinesis.py | 1 -
.../providers/amazon/aws/hooks/lambda_function.py | 1 -
airflow/providers/amazon/aws/hooks/logs.py | 1 -
airflow/providers/amazon/aws/hooks/s3.py | 8 ------
airflow/providers/amazon/aws/hooks/sagemaker.py | 20 ---------------
airflow/providers/amazon/aws/operators/sqs.py | 1 -
.../providers/amazon/aws/transfers/s3_to_sftp.py | 1 -
.../providers/amazon/aws/transfers/sftp_to_s3.py | 1 -
.../apache/druid/transfers/hive_to_druid.py | 1 -
airflow/providers/apache/hive/hooks/hive.py | 1 -
airflow/providers/apache/pig/hooks/pig.py | 1 -
.../apache/spark/hooks/spark_jdbc_script.py | 2 --
.../providers/apache/spark/hooks/spark_submit.py | 3 ---
.../providers/cncf/kubernetes/hooks/kubernetes.py | 1 -
.../providers/elasticsearch/log/es_task_handler.py | 1 -
airflow/providers/google/cloud/hooks/bigquery.py | 4 ---
airflow/providers/google/cloud/hooks/bigtable.py | 2 --
.../cloud/hooks/cloud_storage_transfer_service.py | 1 -
airflow/providers/google/cloud/hooks/dataprep.py | 3 ---
airflow/providers/google/cloud/hooks/dlp.py | 30 ----------------------
airflow/providers/google/cloud/hooks/gcs.py | 5 ----
.../google/cloud/hooks/kubernetes_engine.py | 2 --
.../providers/google/cloud/hooks/stackdriver.py | 6 -----
airflow/providers/google/cloud/hooks/tasks.py | 13 ----------
.../operators/cloud_storage_transfer_service.py | 1 -
.../providers/google/cloud/operators/mlengine.py | 1 -
airflow/providers/google/cloud/operators/pubsub.py | 1 -
airflow/providers/google/cloud/sensors/pubsub.py | 1 -
.../google/marketing_platform/hooks/analytics.py | 5 ----
.../marketing_platform/hooks/display_video.py | 6 -----
airflow/providers/imap/hooks/imap.py | 1 -
airflow/providers/jdbc/hooks/jdbc.py | 1 -
.../providers/microsoft/azure/hooks/azure_batch.py | 2 --
airflow/providers/microsoft/azure/hooks/wasb.py | 1 -
airflow/providers/mysql/hooks/mysql.py | 1 -
airflow/providers/redis/operators/redis_publish.py | 1 -
airflow/providers/salesforce/hooks/salesforce.py | 1 -
airflow/providers/sendgrid/utils/emailer.py | 1 -
airflow/providers/slack/hooks/slack.py | 1 -
airflow/providers/ssh/hooks/ssh.py | 2 --
airflow/serialization/serialized_objects.py | 1 -
airflow/settings.py | 1 -
airflow/ti_deps/deps/trigger_rule_dep.py | 1 -
airflow/utils/cli.py | 1 -
airflow/utils/dag_processing.py | 3 ---
airflow/utils/dates.py | 1 -
airflow/utils/db.py | 1 -
airflow/utils/decorators.py | 1 -
airflow/utils/email.py | 1 -
airflow/utils/file.py | 3 ---
airflow/utils/log/log_reader.py | 5 ----
airflow/utils/timezone.py | 2 --
airflow/www/api/experimental/endpoints.py | 4 ---
airflow/www/utils.py | 1 -
airflow/www/views.py | 2 --
88 files changed, 1 insertion(+), 215 deletions(-)
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 1512bc9..cc9c811 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -200,7 +200,7 @@ repos:
name: Run pydocstyle
args:
- --convention=pep257
- - --add-ignore=D100,D102,D104,D105,D107,D200,D202,D205,D400,D401
+ - --add-ignore=D100,D102,D104,D105,D107,D200,D205,D400,D401
exclude:
^tests/.*\.py$|^scripts/.*\.py$|^dev|^backport_packages|^kubernetes_tests
- repo: local
hooks:
diff --git a/airflow/api/auth/backend/kerberos_auth.py
b/airflow/api/auth/backend/kerberos_auth.py
index 8285b12..d347484 100644
--- a/airflow/api/auth/backend/kerberos_auth.py
+++ b/airflow/api/auth/backend/kerberos_auth.py
@@ -72,7 +72,6 @@ _KERBEROS_SERVICE = KerberosService()
def init_app(app):
"""Initializes application with kerberos"""
-
hostname = app.config.get('SERVER_NAME')
if not hostname:
hostname = getfqdn()
diff --git a/airflow/api/common/experimental/get_dag_run_state.py
b/airflow/api/common/experimental/get_dag_run_state.py
index cdc518d..7be4541 100644
--- a/airflow/api/common/experimental/get_dag_run_state.py
+++ b/airflow/api/common/experimental/get_dag_run_state.py
@@ -29,7 +29,6 @@ def get_dag_run_state(dag_id: str, execution_date: datetime)
-> Dict[str, str]:
:param execution_date: execution date
:return: Dictionary storing state of the object
"""
-
dag = check_and_get_dag(dag_id=dag_id)
dagrun = check_and_get_dagrun(dag, execution_date)
diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py
b/airflow/api_connexion/endpoints/dag_run_endpoint.py
index 0bba126..8e8f8cb 100644
--- a/airflow/api_connexion/endpoints/dag_run_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py
@@ -87,7 +87,6 @@ def get_dag_runs(
"""
Get all DAG Runs.
"""
-
query = session.query(DagRun)
# This endpoint allows specifying ~ as the dag_id to retrieve DAG Runs
for all DAGs.
diff --git a/airflow/api_connexion/endpoints/event_log_endpoint.py
b/airflow/api_connexion/endpoints/event_log_endpoint.py
index 5ce4400..105794e 100644
--- a/airflow/api_connexion/endpoints/event_log_endpoint.py
+++ b/airflow/api_connexion/endpoints/event_log_endpoint.py
@@ -49,7 +49,6 @@ def get_event_logs(session, limit, offset=None):
"""
Get all log entries from event log
"""
-
total_entries = session.query(func.count(Log.id)).scalar()
event_logs =
session.query(Log).order_by(Log.id).offset(offset).limit(limit).all()
return event_log_collection_schema.dump(
diff --git a/airflow/api_connexion/endpoints/import_error_endpoint.py
b/airflow/api_connexion/endpoints/import_error_endpoint.py
index 10ab501..5f61b70 100644
--- a/airflow/api_connexion/endpoints/import_error_endpoint.py
+++ b/airflow/api_connexion/endpoints/import_error_endpoint.py
@@ -52,7 +52,6 @@ def get_import_errors(session, limit, offset=None):
"""
Get all import errors
"""
-
total_entries = session.query(func.count(ImportError.id)).scalar()
import_errors =
session.query(ImportError).order_by(ImportError.id).offset(offset).limit(limit).all()
return import_error_collection_schema.dump(
diff --git a/airflow/api_connexion/endpoints/pool_endpoint.py
b/airflow/api_connexion/endpoints/pool_endpoint.py
index b9c51b0..fc363d7 100644
--- a/airflow/api_connexion/endpoints/pool_endpoint.py
+++ b/airflow/api_connexion/endpoints/pool_endpoint.py
@@ -60,7 +60,6 @@ def get_pools(session, limit, offset=None):
"""
Get all pools
"""
-
total_entries = session.query(func.count(Pool.id)).scalar()
pools =
session.query(Pool).order_by(Pool.id).offset(offset).limit(limit).all()
return pool_collection_schema.dump(PoolCollection(pools=pools,
total_entries=total_entries))
diff --git a/airflow/api_connexion/endpoints/xcom_endpoint.py
b/airflow/api_connexion/endpoints/xcom_endpoint.py
index af0cf1c..b232d66 100644
--- a/airflow/api_connexion/endpoints/xcom_endpoint.py
+++ b/airflow/api_connexion/endpoints/xcom_endpoint.py
@@ -47,7 +47,6 @@ def get_xcom_entries(
"""
Get all XCom values
"""
-
query = session.query(XCom)
if dag_id != '~':
query = query.filter(XCom.dag_id == dag_id)
diff --git a/airflow/api_connexion/schemas/common_schema.py
b/airflow/api_connexion/schemas/common_schema.py
index 7f1020c..4d49203 100644
--- a/airflow/api_connexion/schemas/common_schema.py
+++ b/airflow/api_connexion/schemas/common_schema.py
@@ -45,7 +45,6 @@ class TimeDeltaSchema(Schema):
@marshmallow.post_load
def make_time_delta(self, data, **kwargs):
"""Create time delta based on data"""
-
if "objectType" in data:
del data["objectType"]
return datetime.timedelta(**data)
@@ -74,7 +73,6 @@ class RelativeDeltaSchema(Schema):
@marshmallow.post_load
def make_relative_delta(self, data, **kwargs):
"""Create relative delta based on data"""
-
if "objectType" in data:
del data["objectType"]
diff --git a/airflow/api_connexion/schemas/dag_schema.py
b/airflow/api_connexion/schemas/dag_schema.py
index 7a50799..1304bb7 100644
--- a/airflow/api_connexion/schemas/dag_schema.py
+++ b/airflow/api_connexion/schemas/dag_schema.py
@@ -56,7 +56,6 @@ class DAGSchema(SQLAlchemySchema):
@staticmethod
def get_owners(obj: DagModel):
"""Convert owners attribute to DAG representation"""
-
if not getattr(obj, 'owners', None):
return []
return obj.owners.split(",")
diff --git a/airflow/cli/commands/dag_command.py
b/airflow/cli/commands/dag_command.py
index 02b8b74..1eab5bc 100644
--- a/airflow/cli/commands/dag_command.py
+++ b/airflow/cli/commands/dag_command.py
@@ -383,7 +383,6 @@ def dag_list_dag_runs(args, dag=None):
@cli_utils.action_logging
def generate_pod_yaml(args):
"""Generates yaml files for each task in the DAG. Used for testing output
of KubernetesExecutor"""
-
from kubernetes.client.api_client import ApiClient
from airflow.executors.kubernetes_executor import
AirflowKubernetesScheduler, KubeConfig
diff --git a/airflow/cli/commands/task_command.py
b/airflow/cli/commands/task_command.py
index a6a4624..ec72ae1 100644
--- a/airflow/cli/commands/task_command.py
+++ b/airflow/cli/commands/task_command.py
@@ -143,7 +143,6 @@ def _run_raw_task(args, ti):
@cli_utils.action_logging
def task_run(args, dag=None):
"""Runs a single task instance"""
-
# Load custom airflow config
if args.cfg_path:
with open(args.cfg_path, 'r') as conf_file:
@@ -289,7 +288,6 @@ def _guess_debugger():
* `ipdb <https://github.com/gotcha/ipdb>`__
* `pdb <https://docs.python.org/3/library/pdb.html>`__
"""
-
for mod in SUPPORTED_DEBUGGER_MODULES:
try:
return importlib.import_module(mod)
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 5d878f4..d41366b 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -228,7 +228,6 @@ class AirflowConfigParser(ConfigParser): # pylint:
disable=too-many-ancestors
Validate that config values aren't invalid given other config values
or system-level limitations and requirements.
"""
-
if (
self.get("core", "executor") not in ('DebugExecutor',
'SequentialExecutor') and
"sqlite" in self.get('core', 'sql_alchemy_conn')):
diff --git a/airflow/executors/celery_executor.py
b/airflow/executors/celery_executor.py
index de5a733..7a41693 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -282,7 +282,6 @@ class CeleryExecutor(BaseExecutor):
def update_all_task_states(self) -> None:
"""Updates states of the tasks."""
-
self.log.debug("Inquiring about %s celery task(s)", len(self.tasks))
state_and_info_by_celery_task_id =
self.bulk_state_fetcher.get_many(self.tasks.values())
@@ -401,7 +400,6 @@ def fetch_celery_task_state(async_result: AsyncResult) -> \
of the task
:rtype: tuple[str, str, str]
"""
-
try:
with timeout(seconds=OPERATION_TIMEOUT):
# Accessing state property of celery task will make actual network
request
diff --git a/airflow/hooks/dbapi_hook.py b/airflow/hooks/dbapi_hook.py
index 42736bb..41b660f 100644
--- a/airflow/hooks/dbapi_hook.py
+++ b/airflow/hooks/dbapi_hook.py
@@ -222,7 +222,6 @@ class DbApiHook(BaseHook):
:return: connection autocommit setting.
:rtype: bool
"""
-
return getattr(conn, 'autocommit', False) and self.supports_autocommit
def get_cursor(self):
@@ -322,7 +321,6 @@ class DbApiHook(BaseHook):
:return: The serialized cell
:rtype: str
"""
-
if cell is None:
return None
if isinstance(cell, datetime):
diff --git a/airflow/jobs/backfill_job.py b/airflow/jobs/backfill_job.py
index 92240eb..c606119 100644
--- a/airflow/jobs/backfill_job.py
+++ b/airflow/jobs/backfill_job.py
@@ -412,7 +412,6 @@ class BackfillJob(BaseJob):
:return: the list of execution_dates for the finished dag runs
:rtype: list
"""
-
executed_run_dates = []
while ((len(ti_status.to_run) > 0 or len(ti_status.running) > 0) and
diff --git a/airflow/jobs/local_task_job.py b/airflow/jobs/local_task_job.py
index 4b7c6d6..176a3ab 100644
--- a/airflow/jobs/local_task_job.py
+++ b/airflow/jobs/local_task_job.py
@@ -129,7 +129,6 @@ class LocalTaskJob(BaseJob):
@provide_session
def heartbeat_callback(self, session=None):
"""Self destruct task if state has been moved away from running
externally"""
-
if self.terminating:
# ensure termination if processes are created later
self.task_runner.terminate()
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 83c731d..0c1c170 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -691,7 +691,6 @@ class DagFileProcessor(LoggingMixin):
active DAG runs and adding task instances that should run to the
queue.
"""
-
# update the state of the previously active dag runs
active_dag_runs = 0
task_instances_list = []
diff --git a/airflow/kubernetes/kube_client.py
b/airflow/kubernetes/kube_client.py
index 1394138..77f61ca 100644
--- a/airflow/kubernetes/kube_client.py
+++ b/airflow/kubernetes/kube_client.py
@@ -80,7 +80,6 @@ def get_kube_client(in_cluster: bool =
conf.getboolean('kubernetes', 'in_cluster
:return kubernetes client
:rtype client.CoreV1Api
"""
-
if not has_kubernetes:
raise _import_err
diff --git a/airflow/kubernetes/pod_generator.py
b/airflow/kubernetes/pod_generator.py
index c8c39a7..088c7c8 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -179,7 +179,6 @@ class PodGenerator:
@staticmethod
def from_obj(obj) -> Optional[Union[dict, k8s.V1Pod]]:
"""Converts to pod from obj"""
-
if obj is None:
return None
diff --git a/airflow/macros/__init__.py b/airflow/macros/__init__.py
index c0fbc5f..b6efaf5 100644
--- a/airflow/macros/__init__.py
+++ b/airflow/macros/__init__.py
@@ -40,7 +40,6 @@ def ds_add(ds, days):
>>> ds_add('2015-01-06', -5)
'2015-01-01'
"""
-
ds = datetime.strptime(ds, '%Y-%m-%d')
if days:
ds = ds + timedelta(days)
diff --git
a/airflow/migrations/versions/6e96a59344a4_make_taskinstance_pool_not_nullable.py
b/airflow/migrations/versions/6e96a59344a4_make_taskinstance_pool_not_nullable.py
index 1504682..ae7ac85 100644
---
a/airflow/migrations/versions/6e96a59344a4_make_taskinstance_pool_not_nullable.py
+++
b/airflow/migrations/versions/6e96a59344a4_make_taskinstance_pool_not_nullable.py
@@ -112,7 +112,6 @@ def downgrade():
"""
Make TaskInstance.pool field nullable.
"""
-
conn = op.get_bind()
if conn.dialect.name == "mssql":
op.drop_index('ti_pool', table_name='task_instance')
diff --git
a/airflow/migrations/versions/8f966b9c467a_set_conn_type_as_non_nullable.py
b/airflow/migrations/versions/8f966b9c467a_set_conn_type_as_non_nullable.py
index 8ebb8af..c912c3d 100644
--- a/airflow/migrations/versions/8f966b9c467a_set_conn_type_as_non_nullable.py
+++ b/airflow/migrations/versions/8f966b9c467a_set_conn_type_as_non_nullable.py
@@ -37,7 +37,6 @@ depends_on = None
def upgrade():
"""Apply Set conn_type as non-nullable"""
-
Base = declarative_base()
class Connection(Base):
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index d428880..7002379 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -734,7 +734,6 @@ class BaseOperator(Operator, LoggingMixin, TaskMixin,
metaclass=BaseOperatorMeta
@cached_property
def operator_extra_link_dict(self) -> Dict[str, Any]:
"""Returns dictionary of all extra links for the operator"""
-
op_extra_links_from_plugin: Dict[str, Any] = {}
from airflow import plugins_manager
plugins_manager.initialize_extra_operators_links_plugins()
@@ -831,7 +830,6 @@ class BaseOperator(Operator, LoggingMixin, TaskMixin,
metaclass=BaseOperatorMeta
:param jinja_env: Jinja environment
:type jinja_env: jinja2.Environment
"""
-
if not jinja_env:
jinja_env = self.get_template_env()
@@ -866,7 +864,6 @@ class BaseOperator(Operator, LoggingMixin, TaskMixin,
metaclass=BaseOperatorMeta
:type seen_oids: set
:return: Templated content
"""
-
if not jinja_env:
jinja_env = self.get_template_env()
@@ -1034,7 +1031,6 @@ class BaseOperator(Operator, LoggingMixin, TaskMixin,
metaclass=BaseOperatorMeta
"""
Get a flat set of relatives' ids, either upstream or downstream.
"""
-
if not self._dag:
return set()
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 87690cb..9887fb9 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -826,7 +826,6 @@ class DAG(BaseDag, LoggingMixin):
def get_template_env(self) -> jinja2.Environment:
"""Build a Jinja2 environment."""
-
# Collect directories to search for template files
searchpath = [self.folder]
if self.template_searchpath:
@@ -1683,7 +1682,6 @@ class DAG(BaseDag, LoggingMixin):
:type active_dag_ids: list[unicode]
:return: None
"""
-
if len(active_dag_ids) == 0:
return
for dag in session.query(
diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index 1a12998..8ff058a 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -361,7 +361,6 @@ class DagBag(BaseDagBag, LoggingMixin):
Adds the DAG into the bag, recurses into sub dags.
Throws AirflowDagCycleException if a cycle is detected in this dag or
its subdags
"""
-
test_cycle(dag) # throws if a task cycle is found
dag.resolve_template_files()
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 9349656..9949b43 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -278,7 +278,6 @@ class DagRun(Base, LoggingMixin):
@provide_session
def get_previous_dagrun(self, state: Optional[str] = None, session:
Session = None) -> Optional['DagRun']:
"""The previous DagRun, if there is one"""
-
filters = [
DagRun.dag_id == self.dag_id,
DagRun.execution_date < self.execution_date,
@@ -312,7 +311,6 @@ class DagRun(Base, LoggingMixin):
:return: ready_tis: the tis that can be scheduled in the current loop
:rtype ready_tis: list[airflow.models.TaskInstance]
"""
-
dag = self.get_dag()
ready_tis: List[TI] = []
tis = list(self.get_task_instances(session=session,
state=State.task_states + (State.SHUTDOWN,)))
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index bb6a841..8a7cf1b 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -938,7 +938,6 @@ class TaskInstance(Base, LoggingMixin): # pylint:
disable=R0902,R0904
:return: whether the state was changed to running or not
:rtype: bool
"""
-
task = self.task
self.refresh_from_task(task, pool_override=pool)
self.test_mode = test_mode
@@ -1061,7 +1060,6 @@ class TaskInstance(Base, LoggingMixin): # pylint:
disable=R0902,R0904
:param session: SQLAlchemy ORM Session
:type session: Session
"""
-
task = self.task
self.test_mode = test_mode
self.refresh_from_task(task, pool_override=pool)
@@ -1719,7 +1717,6 @@ class TaskInstance(Base, LoggingMixin): # pylint:
disable=R0902,R0904
task on a future date without it being immediately visible.
:type execution_date: datetime
"""
-
if execution_date and execution_date < self.execution_date:
raise ValueError(
'execution_date can not be in the past (current '
@@ -1768,7 +1765,6 @@ class TaskInstance(Base, LoggingMixin): # pylint:
disable=R0902,R0904
are returned as well.
:type include_prior_dates: bool
"""
-
if dag_id is None:
dag_id = self.dag_id
@@ -1922,7 +1918,6 @@ class SimpleTaskInstance:
session is committed.
:return: the task instance constructed
"""
-
qry = session.query(TaskInstance).filter(
TaskInstance.dag_id == self._dag_id,
TaskInstance.task_id == self._task_id,
diff --git a/airflow/models/variable.py b/airflow/models/variable.py
index f388b10..b3cc01b 100644
--- a/airflow/models/variable.py
+++ b/airflow/models/variable.py
@@ -156,7 +156,6 @@ class Variable(Base, LoggingMixin):
:param serialize_json: Serialize the value to a JSON string
:param session: SQL Alchemy Sessions
"""
-
if serialize_json:
stored_value = json.dumps(value, indent=2)
else:
diff --git a/airflow/operators/sql.py b/airflow/operators/sql.py
index 7e005ff..d670e17 100644
--- a/airflow/operators/sql.py
+++ b/airflow/operators/sql.py
@@ -465,7 +465,6 @@ class SQLThresholdCheckOperator(BaseOperator):
Optional: Send data check info and metadata to an external database.
Default functionality will log metadata.
"""
-
info = "\n".join([f"""{key}: {item}""" for key, item in
meta_data.items()])
self.log.info("Log from %s:\n%s", self.dag_id, info)
diff --git a/airflow/providers/amazon/aws/hooks/cloud_formation.py
b/airflow/providers/amazon/aws/hooks/cloud_formation.py
index e091c4d..45b40b5 100644
--- a/airflow/providers/amazon/aws/hooks/cloud_formation.py
+++ b/airflow/providers/amazon/aws/hooks/cloud_formation.py
@@ -42,7 +42,6 @@ class AWSCloudFormationHook(AwsBaseHook):
"""
Get stack status from CloudFormation.
"""
-
self.log.info('Poking for stack %s', stack_name)
try:
@@ -63,7 +62,6 @@ class AWSCloudFormationHook(AwsBaseHook):
:param params: parameters to be passed to CloudFormation.
:type params: dict
"""
-
if 'StackName' not in params:
params['StackName'] = stack_name
self.get_conn().create_stack(**params)
@@ -77,7 +75,6 @@ class AWSCloudFormationHook(AwsBaseHook):
:param params: parameters to be passed to CloudFormation (optional).
:type params: dict
"""
-
params = params or {}
if 'StackName' not in params:
params['StackName'] = stack_name
diff --git a/airflow/providers/amazon/aws/hooks/emr.py
b/airflow/providers/amazon/aws/hooks/emr.py
index 1c65a5f..6a18690 100644
--- a/airflow/providers/amazon/aws/hooks/emr.py
+++ b/airflow/providers/amazon/aws/hooks/emr.py
@@ -49,7 +49,6 @@ class EmrHook(AwsBaseHook):
:type cluster_states: list
:return: id of the EMR cluster
"""
-
response = self.get_conn().list_clusters(ClusterStates=cluster_states)
matching_clusters = list(
@@ -73,7 +72,6 @@ class EmrHook(AwsBaseHook):
run_job_flow method.
Overrides for this config may be passed as the job_flow_overrides.
"""
-
if not self.emr_conn_id:
raise AirflowException('emr_conn_id must be present to use
create_job_flow')
diff --git a/airflow/providers/amazon/aws/hooks/glue_catalog.py
b/airflow/providers/amazon/aws/hooks/glue_catalog.py
index 27fc7c1..9a7ea5f 100644
--- a/airflow/providers/amazon/aws/hooks/glue_catalog.py
+++ b/airflow/providers/amazon/aws/hooks/glue_catalog.py
@@ -109,7 +109,6 @@ class AwsGlueCatalogHook(AwsBaseHook):
>>> r = hook.get_table('db', 'table_foo')
>>> r['Name'] = 'table_foo'
"""
-
result = self.get_conn().get_table(DatabaseName=database_name,
Name=table_name)
return result['Table']
@@ -124,7 +123,6 @@ class AwsGlueCatalogHook(AwsBaseHook):
:type table_name: str
:return: str
"""
-
table = self.get_table(database_name, table_name)
return table['StorageDescriptor']['Location']
diff --git a/airflow/providers/amazon/aws/hooks/kinesis.py
b/airflow/providers/amazon/aws/hooks/kinesis.py
index 304f8cd..6039689 100644
--- a/airflow/providers/amazon/aws/hooks/kinesis.py
+++ b/airflow/providers/amazon/aws/hooks/kinesis.py
@@ -47,7 +47,6 @@ class AwsFirehoseHook(AwsBaseHook):
"""
Write batch records to Kinesis Firehose
"""
-
response =
self.get_conn().put_record_batch(DeliveryStreamName=self.delivery_stream,
Records=records)
return response
diff --git a/airflow/providers/amazon/aws/hooks/lambda_function.py
b/airflow/providers/amazon/aws/hooks/lambda_function.py
index 008aafb..8b6ad50 100644
--- a/airflow/providers/amazon/aws/hooks/lambda_function.py
+++ b/airflow/providers/amazon/aws/hooks/lambda_function.py
@@ -62,7 +62,6 @@ class AwsLambdaHook(AwsBaseHook):
"""
Invoke Lambda Function
"""
-
response = self.conn.invoke(
FunctionName=self.function_name,
InvocationType=self.invocation_type,
diff --git a/airflow/providers/amazon/aws/hooks/logs.py
b/airflow/providers/amazon/aws/hooks/logs.py
index 16b9f94..bb22ae1 100644
--- a/airflow/providers/amazon/aws/hooks/logs.py
+++ b/airflow/providers/amazon/aws/hooks/logs.py
@@ -70,7 +70,6 @@ class AwsLogsHook(AwsBaseHook):
| 'message' (str): The log event data.
| 'ingestionTime' (int): The time in milliseconds the event
was ingested.
"""
-
next_token = None
event_count = 1
diff --git a/airflow/providers/amazon/aws/hooks/s3.py
b/airflow/providers/amazon/aws/hooks/s3.py
index 30adcc3..26713ac 100644
--- a/airflow/providers/amazon/aws/hooks/s3.py
+++ b/airflow/providers/amazon/aws/hooks/s3.py
@@ -47,7 +47,6 @@ def provide_bucket_name(func: T) -> T:
Function decorator that provides a bucket name taken from the connection
in case no bucket name has been passed to the function.
"""
-
function_signature = signature(func)
@wraps(func)
@@ -71,7 +70,6 @@ def unify_bucket_name_and_key(func: T) -> T:
Function decorator that unifies bucket name and key taken from the key
in case no bucket name and at least a key has been passed to the function.
"""
-
function_signature = signature(func)
@wraps(func)
@@ -311,7 +309,6 @@ class S3Hook(AwsBaseHook):
:return: True if the key exists and False if not.
:rtype: bool
"""
-
try:
self.get_conn().head_object(Bucket=bucket_name, Key=key)
return True
@@ -332,7 +329,6 @@ class S3Hook(AwsBaseHook):
:return: the key object from the bucket
:rtype: boto3.s3.Object
"""
-
obj = self.get_resource_type('s3').Object(bucket_name, key)
obj.load()
return obj
@@ -350,7 +346,6 @@ class S3Hook(AwsBaseHook):
:return: the content of the key
:rtype: str
"""
-
obj = self.get_key(key, bucket_name)
return obj.get()['Body'].read().decode('utf-8')
@@ -447,7 +442,6 @@ class S3Hook(AwsBaseHook):
:return: the key object from the bucket or None if none has been found.
:rtype: boto3.s3.Object
"""
-
prefix = re.split(r'[*]', wildcard_key, 1)[0]
key_list = self.list_keys(bucket_name, prefix=prefix,
delimiter=delimiter)
key_matches = [k for k in key_list if fnmatch.fnmatch(k, wildcard_key)]
@@ -489,7 +483,6 @@ class S3Hook(AwsBaseHook):
uploaded to the S3 bucket.
:type acl_policy: str
"""
-
if not replace and self.check_for_key(key, bucket_name):
raise ValueError("The key {key} already exists.".format(key=key))
@@ -813,7 +806,6 @@ class S3Hook(AwsBaseHook):
:return: The presigned url.
:rtype: str
"""
-
s3_client = self.get_conn()
try:
return s3_client.generate_presigned_url(
diff --git a/airflow/providers/amazon/aws/hooks/sagemaker.py
b/airflow/providers/amazon/aws/hooks/sagemaker.py
index fb5aed6..b1bd3ba 100644
--- a/airflow/providers/amazon/aws/hooks/sagemaker.py
+++ b/airflow/providers/amazon/aws/hooks/sagemaker.py
@@ -105,7 +105,6 @@ def secondary_training_status_message(job_description,
prev_description):
:return: Job status string to be printed.
"""
-
if (
job_description is None
or job_description.get('SecondaryStatusTransitions') is None
@@ -336,7 +335,6 @@ class SageMakerHook(AwsBaseHook): # pylint:
disable=too-many-public-methods
:type max_ingestion_time: int
:return: A response to training job creation
"""
-
self.check_training_config(config)
response = self.get_conn().create_training_job(**config)
@@ -382,7 +380,6 @@ class SageMakerHook(AwsBaseHook): # pylint:
disable=too-many-public-methods
:type max_ingestion_time: int
:return: A response to tuning job creation
"""
-
self.check_tuning_config(config)
response = self.get_conn().create_hyper_parameter_tuning_job(**config)
@@ -415,7 +412,6 @@ class SageMakerHook(AwsBaseHook): # pylint:
disable=too-many-public-methods
:type max_ingestion_time: int
:return: A response to transform job creation
"""
-
self.check_s3_url(config['TransformInput']['DataSource']['S3DataSource']['S3Uri'])
response = self.get_conn().create_transform_job(**config)
@@ -448,7 +444,6 @@ class SageMakerHook(AwsBaseHook): # pylint:
disable=too-many-public-methods
:type max_ingestion_time: int
:return: A response to transform job creation
"""
-
response = self.get_conn().create_processing_job(**config)
if wait_for_completion:
self.check_status(
@@ -468,7 +463,6 @@ class SageMakerHook(AwsBaseHook): # pylint:
disable=too-many-public-methods
:type config: dict
:return: A response to model creation
"""
-
return self.get_conn().create_model(**config)
def create_endpoint_config(self, config):
@@ -479,7 +473,6 @@ class SageMakerHook(AwsBaseHook): # pylint:
disable=too-many-public-methods
:type config: dict
:return: A response to endpoint config creation
"""
-
return self.get_conn().create_endpoint_config(**config)
def create_endpoint(self, config, wait_for_completion=True,
check_interval=30, max_ingestion_time=None):
@@ -499,7 +492,6 @@ class SageMakerHook(AwsBaseHook): # pylint:
disable=too-many-public-methods
:type max_ingestion_time: int
:return: A response to endpoint creation
"""
-
response = self.get_conn().create_endpoint(**config)
if wait_for_completion:
self.check_status(
@@ -529,7 +521,6 @@ class SageMakerHook(AwsBaseHook): # pylint:
disable=too-many-public-methods
:type max_ingestion_time: int
:return: A response to endpoint update
"""
-
response = self.get_conn().update_endpoint(**config)
if wait_for_completion:
self.check_status(
@@ -550,7 +541,6 @@ class SageMakerHook(AwsBaseHook): # pylint:
disable=too-many-public-methods
:type name: str
:return: A dict contains all the training job info
"""
-
return self.get_conn().describe_training_job(TrainingJobName=name)
def describe_training_job_with_log(
@@ -624,7 +614,6 @@ class SageMakerHook(AwsBaseHook): # pylint:
disable=too-many-public-methods
:type name: str
:return: A dict contains all the tuning job info
"""
-
return
self.get_conn().describe_hyper_parameter_tuning_job(HyperParameterTuningJobName=name)
def describe_model(self, name):
@@ -635,7 +624,6 @@ class SageMakerHook(AwsBaseHook): # pylint:
disable=too-many-public-methods
:type name: str
:return: A dict contains all the model info
"""
-
return self.get_conn().describe_model(ModelName=name)
def describe_transform_job(self, name):
@@ -646,7 +634,6 @@ class SageMakerHook(AwsBaseHook): # pylint:
disable=too-many-public-methods
:type name: str
:return: A dict contains all the transform job info
"""
-
return self.get_conn().describe_transform_job(TransformJobName=name)
def describe_processing_job(self, name):
@@ -657,7 +644,6 @@ class SageMakerHook(AwsBaseHook): # pylint:
disable=too-many-public-methods
:type name: str
:return: A dict contains all the processing job info
"""
-
return self.get_conn().describe_processing_job(ProcessingJobName=name)
def describe_endpoint_config(self, name):
@@ -668,7 +654,6 @@ class SageMakerHook(AwsBaseHook): # pylint:
disable=too-many-public-methods
:type name: str
:return: A dict contains all the endpoint config info
"""
-
return
self.get_conn().describe_endpoint_config(EndpointConfigName=name)
def describe_endpoint(self, name):
@@ -677,7 +662,6 @@ class SageMakerHook(AwsBaseHook): # pylint:
disable=too-many-public-methods
:type name: str
:return: A dict contains all the endpoint info
"""
-
return self.get_conn().describe_endpoint(EndpointName=name)
def check_status(
@@ -769,7 +753,6 @@ class SageMakerHook(AwsBaseHook): # pylint:
disable=too-many-public-methods
:type max_ingestion_time: int
:return: None
"""
-
sec = 0
description = self.describe_training_job(job_name)
self.log.info(secondary_training_status_message(description, None))
@@ -856,7 +839,6 @@ class SageMakerHook(AwsBaseHook): # pylint:
disable=too-many-public-methods
:param kwargs: (optional) kwargs to boto3's list_training_jobs method
:return: results of the list_training_jobs request
"""
-
config = {}
if name_contains:
@@ -893,7 +875,6 @@ class SageMakerHook(AwsBaseHook): # pylint:
disable=too-many-public-methods
:param kwargs: (optional) kwargs to boto3's list_training_jobs method
:return: results of the list_processing_jobs request
"""
-
list_processing_jobs_request =
partial(self.get_conn().list_processing_jobs, **kwargs)
results = self._list_request(
list_processing_jobs_request, "ProcessingJobSummaries",
max_results=kwargs.get("MaxResults")
@@ -915,7 +896,6 @@ class SageMakerHook(AwsBaseHook): # pylint:
disable=too-many-public-methods
:param max_results: maximum number of results to return (None =
infinite)
:return: Results of the list_* request
"""
-
sagemaker_max_results = 100 # Fixed number set by AWS
results: List[Dict] = []
diff --git a/airflow/providers/amazon/aws/operators/sqs.py
b/airflow/providers/amazon/aws/operators/sqs.py
index 00b29db..6005195 100644
--- a/airflow/providers/amazon/aws/operators/sqs.py
+++ b/airflow/providers/amazon/aws/operators/sqs.py
@@ -70,7 +70,6 @@ class SQSPublishOperator(BaseOperator):
For details of the returned dict see
:py:meth:`botocore.client.SQS.send_message`
:rtype: dict
"""
-
hook = SQSHook(aws_conn_id=self.aws_conn_id)
result = hook.send_message(
diff --git a/airflow/providers/amazon/aws/transfers/s3_to_sftp.py
b/airflow/providers/amazon/aws/transfers/s3_to_sftp.py
index 092bbe8..97b1918 100644
--- a/airflow/providers/amazon/aws/transfers/s3_to_sftp.py
+++ b/airflow/providers/amazon/aws/transfers/s3_to_sftp.py
@@ -62,7 +62,6 @@ class S3ToSFTPOperator(BaseOperator):
@staticmethod
def get_s3_key(s3_key):
"""This parses the correct format for S3 keys regardless of how the S3
url is passed."""
-
parsed_s3_key = urlparse(s3_key)
return parsed_s3_key.path.lstrip('/')
diff --git a/airflow/providers/amazon/aws/transfers/sftp_to_s3.py
b/airflow/providers/amazon/aws/transfers/sftp_to_s3.py
index 087eb74..6cf1a73 100644
--- a/airflow/providers/amazon/aws/transfers/sftp_to_s3.py
+++ b/airflow/providers/amazon/aws/transfers/sftp_to_s3.py
@@ -62,7 +62,6 @@ class SFTPToS3Operator(BaseOperator):
@staticmethod
def get_s3_key(s3_key):
"""This parses the correct format for S3 keys regardless of how the S3
url is passed."""
-
parsed_s3_key = urlparse(s3_key)
return parsed_s3_key.path.lstrip('/')
diff --git a/airflow/providers/apache/druid/transfers/hive_to_druid.py
b/airflow/providers/apache/druid/transfers/hive_to_druid.py
index 44ac3fc..a1f14b6 100644
--- a/airflow/providers/apache/druid/transfers/hive_to_druid.py
+++ b/airflow/providers/apache/druid/transfers/hive_to_druid.py
@@ -176,7 +176,6 @@ class HiveToDruidOperator(BaseOperator):
:param columns: List of all the columns that are available
:type columns: list
"""
-
# backward compatibility for num_shards,
# but target_partition_size is the default setting
# and overwrites the num_shards
diff --git a/airflow/providers/apache/hive/hooks/hive.py
b/airflow/providers/apache/hive/hooks/hive.py
index 6b7042a..a0d5e87 100644
--- a/airflow/providers/apache/hive/hooks/hive.py
+++ b/airflow/providers/apache/hive/hooks/hive.py
@@ -979,7 +979,6 @@ class HiveServer2Hook(DbApiHook):
:type hive_conf: dict
"""
-
results_iter = self._get_results(hql, schema, fetch_size=fetch_size,
hive_conf=hive_conf)
header = next(results_iter)
message = None
diff --git a/airflow/providers/apache/pig/hooks/pig.py
b/airflow/providers/apache/pig/hooks/pig.py
index 4152dd2..4b08def 100644
--- a/airflow/providers/apache/pig/hooks/pig.py
+++ b/airflow/providers/apache/pig/hooks/pig.py
@@ -49,7 +49,6 @@ class PigCliHook(BaseHook):
>>> ("hdfs://" in result)
True
"""
-
with TemporaryDirectory(prefix='airflow_pigop_') as tmp_dir:
with NamedTemporaryFile(dir=tmp_dir) as f:
f.write(pig.encode('utf-8'))
diff --git a/airflow/providers/apache/spark/hooks/spark_jdbc_script.py
b/airflow/providers/apache/spark/hooks/spark_jdbc_script.py
index ffc9a3e..572ad5e 100644
--- a/airflow/providers/apache/spark/hooks/spark_jdbc_script.py
+++ b/airflow/providers/apache/spark/hooks/spark_jdbc_script.py
@@ -43,7 +43,6 @@ def set_common_options(
:param password: JDBC resource password
:param driver: JDBC resource driver
"""
-
spark_source = (
spark_source.format('jdbc')
.option('url', url)
@@ -110,7 +109,6 @@ def spark_read_from_jdbc(
"""
Transfer data from JDBC source to Spark
"""
-
# first set common options
reader = set_common_options(spark_session.read, url, jdbc_table, user,
password, driver)
diff --git a/airflow/providers/apache/spark/hooks/spark_submit.py
b/airflow/providers/apache/spark/hooks/spark_submit.py
index 13983e9..93cb15d 100644
--- a/airflow/providers/apache/spark/hooks/spark_submit.py
+++ b/airflow/providers/apache/spark/hooks/spark_submit.py
@@ -561,7 +561,6 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
Unable to run or restart due to an unrecoverable error
(e.g. missing jar file)
"""
-
# When your Spark Standalone cluster is not performing well
# due to misconfiguration or heavy loads.
# it is possible that the polling request will timeout.
@@ -604,7 +603,6 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
Construct the spark-submit command to kill a driver.
:return: full command to kill a driver
"""
-
# If the spark_home is passed then build the spark-submit executable
path using
# the spark_home; otherwise assume that spark-submit is present in the
path to
# the executing user
@@ -630,7 +628,6 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
"""
Kill Spark submit command
"""
-
self.log.debug("Kill Command is being called")
if self._should_track_driver_status:
diff --git a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
index 6ff0b9b..2cadd28 100644
--- a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
+++ b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
@@ -193,7 +193,6 @@ class KubernetesHook(BaseHook):
:param namespace: kubernetes namespace
:type namespace: str
"""
-
api = client.CoreV1Api(self.api_client)
watcher = watch.Watch()
return (
diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py
b/airflow/providers/elasticsearch/log/es_task_handler.py
index d3d07c1..35c99ea 100644
--- a/airflow/providers/elasticsearch/log/es_task_handler.py
+++ b/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -212,7 +212,6 @@ class ElasticsearchTaskHandler(FileTaskHandler,
LoggingMixin):
:param metadata: log metadata, used for steaming log download.
:type metadata: dict
"""
-
# Offset is the unique key for sorting logs given log_id.
search = Search(using=self.client).query('match_phrase',
log_id=log_id).sort('offset')
diff --git a/airflow/providers/google/cloud/hooks/bigquery.py
b/airflow/providers/google/cloud/hooks/bigquery.py
index aa991f9..ae6770b 100644
--- a/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/airflow/providers/google/cloud/hooks/bigquery.py
@@ -408,7 +408,6 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
:param exists_ok: If ``True``, ignore "already exists" errors when
creating the DATASET.
:type exists_ok: bool
"""
-
dataset_reference = dataset_reference or {"datasetReference": {}}
for param, value in zip(["datasetId", "projectId"], [dataset_id,
project_id]):
@@ -602,7 +601,6 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
}
:type encryption_configuration: dict
"""
-
warnings.warn(
"This method is deprecated. Please use
`BigQueryHook.create_empty_table` method with"
"pass passing the `table_resource` object. This gives more
flexibility than this method.",
@@ -967,7 +965,6 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
:rtype: dataset
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
"""
-
warnings.warn("This method is deprecated. Please use
``update_dataset``.", DeprecationWarning)
project_id = project_id or self.project_id
if not dataset_id or not isinstance(dataset_id, str):
@@ -2919,7 +2916,6 @@ def _validate_src_fmt_configs(
:param backward_compatibility_configs: The top-level params for
backward-compatibility
:type backward_compatibility_configs: dict
"""
-
if backward_compatibility_configs is None:
backward_compatibility_configs = {}
diff --git a/airflow/providers/google/cloud/hooks/bigtable.py
b/airflow/providers/google/cloud/hooks/bigtable.py
index f9912f7..c6eb84f 100644
--- a/airflow/providers/google/cloud/hooks/bigtable.py
+++ b/airflow/providers/google/cloud/hooks/bigtable.py
@@ -320,7 +320,6 @@ class BigtableHook(GoogleBaseHook):
:param table_id: The ID of the table in Cloud Bigtable to fetch Column
Families
from.
"""
-
table = Table(table_id, instance)
return table.list_column_families()
@@ -336,6 +335,5 @@ class BigtableHook(GoogleBaseHook):
:param table_id: The ID of the table in Cloud Bigtable to fetch
Cluster States
from.
"""
-
table = Table(table_id, instance)
return table.get_cluster_states()
diff --git
a/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py
b/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py
index e76fdbc..a5bdce4 100644
--- a/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py
+++ b/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py
@@ -345,7 +345,6 @@ class CloudDataTransferServiceHook(GoogleBaseHook):
:type operation_name: str
:rtype: None
"""
-
self.get_conn().transferOperations().cancel(name=operation_name).execute( #
pylint: disable=no-member
num_retries=self.num_retries
)
diff --git a/airflow/providers/google/cloud/hooks/dataprep.py
b/airflow/providers/google/cloud/hooks/dataprep.py
index 29a0e32..41f27a7 100644
--- a/airflow/providers/google/cloud/hooks/dataprep.py
+++ b/airflow/providers/google/cloud/hooks/dataprep.py
@@ -63,7 +63,6 @@ class GoogleDataprepHook(BaseHook):
:param job_id: The ID of the job that will be fetched
:type job_id: int
"""
-
endpoint_path = f"v4/jobGroups/{job_id}/jobs"
url: str = os.path.join(self._base_url, endpoint_path)
response = requests.get(url, headers=self._headers)
@@ -83,7 +82,6 @@ class GoogleDataprepHook(BaseHook):
:param include_deleted: if set to "true", will include deleted objects
:type include_deleted: bool
"""
-
params: Dict[str, Any] = {"embed": embed, "includeDeleted":
include_deleted}
endpoint_path = f"v4/jobGroups/{job_group_id}"
url: str = os.path.join(self._base_url, endpoint_path)
@@ -102,7 +100,6 @@ class GoogleDataprepHook(BaseHook):
:param body_request: The identifier for the recipe you would like to
run.
:type body_request: dict
"""
-
endpoint_path = "v4/jobGroups"
url: str = os.path.join(self._base_url, endpoint_path)
response = requests.post(url, headers=self._headers,
data=json.dumps(body_request))
diff --git a/airflow/providers/google/cloud/hooks/dlp.py
b/airflow/providers/google/cloud/hooks/dlp.py
index 4e44608..e88b655 100644
--- a/airflow/providers/google/cloud/hooks/dlp.py
+++ b/airflow/providers/google/cloud/hooks/dlp.py
@@ -134,7 +134,6 @@ class CloudDLPHook(GoogleBaseHook):
:param metadata: (Optional) Additional metadata that is provided to
the method.
:type metadata: Sequence[Tuple[str, str]]
"""
-
client = self.get_conn()
if not dlp_job_id:
@@ -179,7 +178,6 @@ class CloudDLPHook(GoogleBaseHook):
:type metadata: Sequence[Tuple[str, str]]
:rtype: google.cloud.dlp_v2.types.DeidentifyTemplate
"""
-
client = self.get_conn()
# Handle project_id from connection configuration
project_id = project_id or self.project_id
@@ -243,7 +241,6 @@ class CloudDLPHook(GoogleBaseHook):
of the operation results. Defaults to 60.
:type time_to_sleep_in_seconds: int
"""
-
client = self.get_conn()
parent = DlpServiceClient.project_path(project_id)
@@ -322,7 +319,6 @@ class CloudDLPHook(GoogleBaseHook):
:type metadata: Sequence[Tuple[str, str]]
:rtype: google.cloud.dlp_v2.types.InspectTemplate
"""
-
client = self.get_conn()
# Handle project_id from connection configuration
@@ -377,7 +373,6 @@ class CloudDLPHook(GoogleBaseHook):
:type metadata: Sequence[Tuple[str, str]]
:rtype: google.cloud.dlp_v2.types.JobTrigger
"""
-
client = self.get_conn()
parent = DlpServiceClient.project_path(project_id)
@@ -425,7 +420,6 @@ class CloudDLPHook(GoogleBaseHook):
:type metadata: Sequence[Tuple[str, str]]
:rtype: google.cloud.dlp_v2.types.StoredInfoType
"""
-
client = self.get_conn()
# Handle project_id from connection configuration
@@ -495,7 +489,6 @@ class CloudDLPHook(GoogleBaseHook):
:type metadata: Sequence[Tuple[str, str]]
:rtype: google.cloud.dlp_v2.types.DeidentifyContentResponse
"""
-
client = self.get_conn()
parent = DlpServiceClient.project_path(project_id)
@@ -536,7 +529,6 @@ class CloudDLPHook(GoogleBaseHook):
:param metadata: (Optional) Additional metadata that is provided to
the method.
:type metadata: Sequence[Tuple[str, str]]
"""
-
client = self.get_conn()
if not template_id:
@@ -583,7 +575,6 @@ class CloudDLPHook(GoogleBaseHook):
:param metadata: (Optional) Additional metadata that is provided to
the method.
:type metadata: Sequence[Tuple[str, str]]
"""
-
client = self.get_conn()
if not dlp_job_id:
@@ -623,7 +614,6 @@ class CloudDLPHook(GoogleBaseHook):
:param metadata: (Optional) Additional metadata that is provided to
the method.
:type metadata: Sequence[Tuple[str, str]]
"""
-
client = self.get_conn()
if not template_id:
@@ -669,7 +659,6 @@ class CloudDLPHook(GoogleBaseHook):
:param metadata: (Optional) Additional metadata that is provided to
the method.
:type metadata: Sequence[Tuple[str, str]]
"""
-
client = self.get_conn()
if not job_trigger_id:
@@ -709,7 +698,6 @@ class CloudDLPHook(GoogleBaseHook):
:param metadata: (Optional) Additional metadata that is provided to
the method.
:type metadata: Sequence[Tuple[str, str]]
"""
-
client = self.get_conn()
if not stored_info_type_id:
@@ -759,7 +747,6 @@ class CloudDLPHook(GoogleBaseHook):
:type metadata: Sequence[Tuple[str, str]]
:rtype: google.cloud.dlp_v2.types.DeidentifyTemplate
"""
-
client = self.get_conn()
if not template_id:
@@ -806,7 +793,6 @@ class CloudDLPHook(GoogleBaseHook):
:type metadata: Sequence[Tuple[str, str]]
:rtype: google.cloud.dlp_v2.types.DlpJob
"""
-
client = self.get_conn()
if not dlp_job_id:
@@ -847,7 +833,6 @@ class CloudDLPHook(GoogleBaseHook):
:type metadata: Sequence[Tuple[str, str]]
:rtype: google.cloud.dlp_v2.types.InspectTemplate
"""
-
client = self.get_conn()
if not template_id:
@@ -894,7 +879,6 @@ class CloudDLPHook(GoogleBaseHook):
:type metadata: Sequence[Tuple[str, str]]
:rtype: google.cloud.dlp_v2.types.JobTrigger
"""
-
client = self.get_conn()
if not job_trigger_id:
@@ -935,7 +919,6 @@ class CloudDLPHook(GoogleBaseHook):
:type metadata: Sequence[Tuple[str, str]]
:rtype: google.cloud.dlp_v2.types.StoredInfoType
"""
-
client = self.get_conn()
if not stored_info_type_id:
@@ -991,7 +974,6 @@ class CloudDLPHook(GoogleBaseHook):
:type metadata: Sequence[Tuple[str, str]]
:rtype: google.cloud.dlp_v2.types.InspectContentResponse
"""
-
client = self.get_conn()
parent = DlpServiceClient.project_path(project_id)
@@ -1042,7 +1024,6 @@ class CloudDLPHook(GoogleBaseHook):
:type metadata: Sequence[Tuple[str, str]]
:rtype: List[google.cloud.dlp_v2.types.DeidentifyTemplate]
"""
-
client = self.get_conn()
# Handle project_id from connection configuration
@@ -1106,7 +1087,6 @@ class CloudDLPHook(GoogleBaseHook):
:type metadata: Sequence[Tuple[str, str]]
:rtype: List[google.cloud.dlp_v2.types.DlpJob]
"""
-
client = self.get_conn()
parent = DlpServiceClient.project_path(project_id)
@@ -1150,7 +1130,6 @@ class CloudDLPHook(GoogleBaseHook):
:type metadata: Sequence[Tuple[str, str]]
:rtype: google.cloud.dlp_v2.types.ListInfoTypesResponse
"""
-
client = self.get_conn()
return client.list_info_types(
@@ -1198,7 +1177,6 @@ class CloudDLPHook(GoogleBaseHook):
:type metadata: Sequence[Tuple[str, str]]
:rtype: List[google.cloud.dlp_v2.types.InspectTemplate]
"""
-
client = self.get_conn()
# Handle project_id from connection configuration
@@ -1258,7 +1236,6 @@ class CloudDLPHook(GoogleBaseHook):
:type metadata: Sequence[Tuple[str, str]]
:rtype: List[google.cloud.dlp_v2.types.JobTrigger]
"""
-
client = self.get_conn()
parent = DlpServiceClient.project_path(project_id)
@@ -1310,7 +1287,6 @@ class CloudDLPHook(GoogleBaseHook):
:type metadata: Sequence[Tuple[str, str]]
:rtype: List[google.cloud.dlp_v2.types.StoredInfoType]
"""
-
client = self.get_conn()
# Handle project_id from connection configuration
@@ -1378,7 +1354,6 @@ class CloudDLPHook(GoogleBaseHook):
:type metadata: Sequence[Tuple[str, str]]
:rtype: google.cloud.dlp_v2.types.RedactImageResponse
"""
-
client = self.get_conn()
parent = DlpServiceClient.project_path(project_id)
@@ -1438,7 +1413,6 @@ class CloudDLPHook(GoogleBaseHook):
:type metadata: Sequence[Tuple[str, str]]
:rtype: google.cloud.dlp_v2.types.ReidentifyContentResponse
"""
-
client = self.get_conn()
parent = DlpServiceClient.project_path(project_id)
@@ -1492,7 +1466,6 @@ class CloudDLPHook(GoogleBaseHook):
:type metadata: Sequence[Tuple[str, str]]
:rtype: google.cloud.dlp_v2.types.DeidentifyTemplate
"""
-
client = self.get_conn()
if not template_id:
@@ -1555,7 +1528,6 @@ class CloudDLPHook(GoogleBaseHook):
:type metadata: Sequence[Tuple[str, str]]
:rtype: google.cloud.dlp_v2.types.InspectTemplate
"""
-
client = self.get_conn()
if not template_id:
@@ -1614,7 +1586,6 @@ class CloudDLPHook(GoogleBaseHook):
:type metadata: Sequence[Tuple[str, str]]
:rtype: google.cloud.dlp_v2.types.JobTrigger
"""
-
client = self.get_conn()
if not job_trigger_id:
@@ -1669,7 +1640,6 @@ class CloudDLPHook(GoogleBaseHook):
:type metadata: Sequence[Tuple[str, str]]
:rtype: google.cloud.dlp_v2.types.StoredInfoType
"""
-
client = self.get_conn()
if not stored_info_type_id:
diff --git a/airflow/providers/google/cloud/hooks/gcs.py
b/airflow/providers/google/cloud/hooks/gcs.py
index aec2871..f370877 100644
--- a/airflow/providers/google/cloud/hooks/gcs.py
+++ b/airflow/providers/google/cloud/hooks/gcs.py
@@ -263,7 +263,6 @@ class GCSHook(GoogleBaseHook):
:param filename: If set, a local file path where the file should be
written to.
:type filename: str
"""
-
# TODO: future improvement check file size before downloading,
# to check for local space availability
@@ -531,7 +530,6 @@ class GCSHook(GoogleBaseHook):
allows to delete non empty bucket
:type: bool
"""
-
client = self.get_conn()
bucket = client.bucket(bucket_name)
@@ -698,7 +696,6 @@ class GCSHook(GoogleBaseHook):
:type labels: dict
:return: If successful, it returns the ``id`` of the bucket.
"""
-
self.log.info(
'Creating Bucket: %s; Location: %s; Storage Class: %s',
bucket_name, location, storage_class
)
@@ -806,7 +803,6 @@ class GCSHook(GoogleBaseHook):
:param destination_object: The path of the object if given.
:type destination_object: str
"""
-
if not source_objects:
raise ValueError('source_objects cannot be empty.')
@@ -995,7 +991,6 @@ def _parse_gcs_url(gsurl: str) -> Tuple[str, str]:
Given a Google Cloud Storage URL (gs://<bucket>/<blob>), returns a
tuple containing the corresponding bucket and blob.
"""
-
parsed_url = urlparse(gsurl)
if not parsed_url.netloc:
raise AirflowException('Please provide a bucket name')
diff --git a/airflow/providers/google/cloud/hooks/kubernetes_engine.py
b/airflow/providers/google/cloud/hooks/kubernetes_engine.py
index 5512846..2d4475b 100644
--- a/airflow/providers/google/cloud/hooks/kubernetes_engine.py
+++ b/airflow/providers/google/cloud/hooks/kubernetes_engine.py
@@ -166,7 +166,6 @@ class GKEHook(GoogleBaseHook):
:type timeout: float
:return: The full url to the delete operation if successful, else None
"""
-
self.log.info("Deleting (project_id=%s, zone=%s, cluster_id=%s)",
project_id, self.location, name)
try:
@@ -207,7 +206,6 @@ class GKEHook(GoogleBaseHook):
ParseError: On JSON parsing problems when trying to convert dict
AirflowException: cluster is not dict type nor Cluster proto type
"""
-
if isinstance(cluster, dict):
cluster_proto = Cluster()
cluster = ParseDict(cluster, cluster_proto)
diff --git a/airflow/providers/google/cloud/hooks/stackdriver.py
b/airflow/providers/google/cloud/hooks/stackdriver.py
index edd898d..aa80485 100644
--- a/airflow/providers/google/cloud/hooks/stackdriver.py
+++ b/airflow/providers/google/cloud/hooks/stackdriver.py
@@ -351,7 +351,6 @@ class StackdriverHook(GoogleBaseHook):
:param metadata: Additional metadata that is provided to the method.
:type metadata: str
"""
-
policy_client = self._get_policy_client()
try:
policy_client.delete_alert_policy(name=name, retry=retry,
timeout=timeout, metadata=metadata)
@@ -408,7 +407,6 @@ class StackdriverHook(GoogleBaseHook):
:param project_id: The project to fetch notification channels from.
:type project_id: str
"""
-
client = self._get_channel_client()
channels = client.list_notification_channels(
name='projects/{project_id}'.format(project_id=project_id),
@@ -482,7 +480,6 @@ class StackdriverHook(GoogleBaseHook):
:param metadata: Additional metadata that is provided to the method.
:type metadata: str
"""
-
self._toggle_channel_status(
project_id=project_id,
filter_=filter_,
@@ -521,7 +518,6 @@ class StackdriverHook(GoogleBaseHook):
:param metadata: Additional metadata that is provided to the method.
:type metadata: str
"""
-
self._toggle_channel_status(
filter_=filter_,
project_id=project_id,
@@ -561,7 +557,6 @@ class StackdriverHook(GoogleBaseHook):
:param metadata: Additional metadata that is provided to the method.
:type metadata: str
"""
-
channel_client = self._get_channel_client()
record = json.loads(channels)
@@ -624,7 +619,6 @@ class StackdriverHook(GoogleBaseHook):
:param metadata: Additional metadata that is provided to the method.
:type metadata: str
"""
-
channel_client = self._get_channel_client()
try:
channel_client.delete_notification_channel(
diff --git a/airflow/providers/google/cloud/hooks/tasks.py
b/airflow/providers/google/cloud/hooks/tasks.py
index 3375f43..1bbbd75 100644
--- a/airflow/providers/google/cloud/hooks/tasks.py
+++ b/airflow/providers/google/cloud/hooks/tasks.py
@@ -117,7 +117,6 @@ class CloudTasksHook(GoogleBaseHook):
:type metadata: sequence[tuple[str, str]]]
:rtype: google.cloud.tasks_v2.types.Queue
"""
-
client = self.get_conn()
if queue_name:
@@ -180,7 +179,6 @@ class CloudTasksHook(GoogleBaseHook):
:type metadata: sequence[tuple[str, str]]]
:rtype: google.cloud.tasks_v2.types.Queue
"""
-
client = self.get_conn()
if queue_name and location:
@@ -230,7 +228,6 @@ class CloudTasksHook(GoogleBaseHook):
:type metadata: sequence[tuple[str, str]]]
:rtype: google.cloud.tasks_v2.types.Queue
"""
-
client = self.get_conn()
full_queue_name = CloudTasksClient.queue_path(project_id, location,
queue_name)
@@ -271,7 +268,6 @@ class CloudTasksHook(GoogleBaseHook):
:type metadata: sequence[tuple[str, str]]]
:rtype: list[google.cloud.tasks_v2.types.Queue]
"""
-
client = self.get_conn()
full_location_path = CloudTasksClient.location_path(project_id,
location)
@@ -315,7 +311,6 @@ class CloudTasksHook(GoogleBaseHook):
:param metadata: (Optional) Additional metadata that is provided to
the method.
:type metadata: sequence[tuple[str, str]]]
"""
-
client = self.get_conn()
full_queue_name = CloudTasksClient.queue_path(project_id, location,
queue_name)
@@ -352,7 +347,6 @@ class CloudTasksHook(GoogleBaseHook):
:type metadata: sequence[tuple[str, str]]]
:rtype: list[google.cloud.tasks_v2.types.Queue]
"""
-
client = self.get_conn()
full_queue_name = CloudTasksClient.queue_path(project_id, location,
queue_name)
@@ -389,7 +383,6 @@ class CloudTasksHook(GoogleBaseHook):
:type metadata: sequence[tuple[str, str]]]
:rtype: list[google.cloud.tasks_v2.types.Queue]
"""
-
client = self.get_conn()
full_queue_name = CloudTasksClient.queue_path(project_id, location,
queue_name)
@@ -426,7 +419,6 @@ class CloudTasksHook(GoogleBaseHook):
:type metadata: sequence[tuple[str, str]]]
:rtype: list[google.cloud.tasks_v2.types.Queue]
"""
-
client = self.get_conn()
full_queue_name = CloudTasksClient.queue_path(project_id, location,
queue_name)
@@ -475,7 +467,6 @@ class CloudTasksHook(GoogleBaseHook):
:type metadata: sequence[tuple[str, str]]]
:rtype: google.cloud.tasks_v2.types.Task
"""
-
client = self.get_conn()
if task_name:
@@ -534,7 +525,6 @@ class CloudTasksHook(GoogleBaseHook):
:type metadata: sequence[tuple[str, str]]]
:rtype: google.cloud.tasks_v2.types.Task
"""
-
client = self.get_conn()
full_task_name = CloudTasksClient.task_path(project_id, location,
queue_name, task_name)
@@ -585,7 +575,6 @@ class CloudTasksHook(GoogleBaseHook):
:type metadata: sequence[tuple[str, str]]]
:rtype: list[google.cloud.tasks_v2.types.Task]
"""
-
client = self.get_conn()
full_queue_name = CloudTasksClient.queue_path(project_id, location,
queue_name)
tasks = client.list_tasks(
@@ -631,7 +620,6 @@ class CloudTasksHook(GoogleBaseHook):
:param metadata: (Optional) Additional metadata that is provided to
the method.
:type metadata: sequence[tuple[str, str]]]
"""
-
client = self.get_conn()
full_task_name = CloudTasksClient.task_path(project_id, location,
queue_name, task_name)
@@ -675,7 +663,6 @@ class CloudTasksHook(GoogleBaseHook):
:type metadata: sequence[tuple[str, str]]]
:rtype: google.cloud.tasks_v2.types.Task
"""
-
client = self.get_conn()
full_task_name = CloudTasksClient.task_path(project_id, location,
queue_name, task_name)
diff --git
a/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py
b/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py
index c2405e5..dc70431 100644
--- a/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py
+++ b/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py
@@ -110,7 +110,6 @@ class TransferJobPreprocessor:
:return: Preprocessed body
:rtype: dict
"""
-
self._inject_aws_credentials()
self._reformat_schedule()
return self.body
diff --git a/airflow/providers/google/cloud/operators/mlengine.py
b/airflow/providers/google/cloud/operators/mlengine.py
index 93719a8..2a42709 100644
--- a/airflow/providers/google/cloud/operators/mlengine.py
+++ b/airflow/providers/google/cloud/operators/mlengine.py
@@ -44,7 +44,6 @@ def _normalize_mlengine_job_id(job_id: str) -> str:
:return: A valid job_id representation.
:rtype: str
"""
-
# Add a prefix when a job_id starts with a digit or a template
match = re.search(r'\d|\{{2}', job_id)
if match and match.start() == 0:
diff --git a/airflow/providers/google/cloud/operators/pubsub.py
b/airflow/providers/google/cloud/operators/pubsub.py
index 59d6a86..544b615 100644
--- a/airflow/providers/google/cloud/operators/pubsub.py
+++ b/airflow/providers/google/cloud/operators/pubsub.py
@@ -960,7 +960,6 @@ class PubSubPullOperator(BaseOperator):
:param context: same as in `execute`
:return: value to be saved to XCom.
"""
-
messages_json = [MessageToDict(m) for m in pulled_messages]
return messages_json
diff --git a/airflow/providers/google/cloud/sensors/pubsub.py
b/airflow/providers/google/cloud/sensors/pubsub.py
index 3be0ead..0b997e1 100644
--- a/airflow/providers/google/cloud/sensors/pubsub.py
+++ b/airflow/providers/google/cloud/sensors/pubsub.py
@@ -202,7 +202,6 @@ class PubSubPullSensor(BaseSensorOperator):
:param context: same as in `execute`
:return: value to be saved to XCom.
"""
-
messages_json = [MessageToDict(m) for m in pulled_messages]
return messages_json
diff --git a/airflow/providers/google/marketing_platform/hooks/analytics.py
b/airflow/providers/google/marketing_platform/hooks/analytics.py
index 972bf1f..528e1ed 100644
--- a/airflow/providers/google/marketing_platform/hooks/analytics.py
+++ b/airflow/providers/google/marketing_platform/hooks/analytics.py
@@ -66,7 +66,6 @@ class GoogleAnalyticsHook(GoogleBaseHook):
"""
Lists accounts list from Google Analytics 360.
"""
-
self.log.info("Retrieving accounts list...")
conn = self.get_conn()
accounts = conn.management().accounts() # pylint: disable=no-member
@@ -89,7 +88,6 @@ class GoogleAnalyticsHook(GoogleBaseHook):
:returns: web property-Google Ads
:rtype: Dict
"""
-
self.log.info("Retrieving ad words links...")
ad_words_link = (
self.get_conn() # pylint: disable=no-member
@@ -116,7 +114,6 @@ class GoogleAnalyticsHook(GoogleBaseHook):
:returns: list of entity Google Ads links.
:rtype: list
"""
-
self.log.info("Retrieving ad words list...")
conn = self.get_conn()
ads_links = conn.management().webPropertyAdWordsLinks() # pylint:
disable=no-member
@@ -147,7 +144,6 @@ class GoogleAnalyticsHook(GoogleBaseHook):
series of at least two requests.
:type resumable_upload: bool
"""
-
media = MediaFileUpload(
file_location,
mimetype="application/octet-stream",
@@ -187,7 +183,6 @@ class GoogleAnalyticsHook(GoogleBaseHook):
:param delete_request_body: Dict of customDataImportUids to delete.
:type delete_request_body: dict
"""
-
self.log.info(
"Deleting previous uploads to GA file for accountId:%s, "
"webPropertyId:%s and customDataSourceId:%s ",
diff --git a/airflow/providers/google/marketing_platform/hooks/display_video.py
b/airflow/providers/google/marketing_platform/hooks/display_video.py
index df793d9..d580954 100644
--- a/airflow/providers/google/marketing_platform/hooks/display_video.py
+++ b/airflow/providers/google/marketing_platform/hooks/display_video.py
@@ -94,7 +94,6 @@ class GoogleDisplayVideo360Hook(GoogleBaseHook):
LineItem, Creative, Pixel, InventorySource, UserList,
UniversalChannel, and summary.
:type entity_type: str
"""
-
return [f"gdbm-{partner_id}/entity/{{{{ ds_nodash
}}}}.*.{entity_type}.json"]
def create_query(self, query: Dict[str, Any]) -> Dict:
@@ -181,7 +180,6 @@ class GoogleDisplayVideo360Hook(GoogleBaseHook):
:return: response body.
:rtype: List[Dict[str, Any]]
"""
-
request_body = {
"lineItems": line_items,
"dryRun": False,
@@ -205,7 +203,6 @@ class GoogleDisplayVideo360Hook(GoogleBaseHook):
https://developers.google.com/bid-manager/v1.1/lineitems/downloadlineitems
:type request_body: Dict[str, Any]
"""
-
response = (
self.get_conn() # pylint: disable=no-member
.lineitems()
@@ -224,7 +221,6 @@ class GoogleDisplayVideo360Hook(GoogleBaseHook):
More information about body request n be found here:
https://developers.google.com/display-video/api/reference/rest/v1/sdfdownloadtasks/create
"""
-
result = (
self.get_conn_to_display_video() # pylint: disable=no-member
.sdfdownloadtasks()
@@ -240,7 +236,6 @@ class GoogleDisplayVideo360Hook(GoogleBaseHook):
:param operation_name: The name of the operation resource.
:type operation_name: str
"""
-
result = (
self.get_conn_to_display_video() # pylint: disable=no-member
.sdfdownloadtasks()
@@ -257,7 +252,6 @@ class GoogleDisplayVideo360Hook(GoogleBaseHook):
:param resource_name: of the media that is being downloaded.
:type resource_name: str
"""
-
request = (
self.get_conn_to_display_video() # pylint: disable=no-member
.media()
diff --git a/airflow/providers/imap/hooks/imap.py
b/airflow/providers/imap/hooks/imap.py
index 6b96faf..15ab214 100644
--- a/airflow/providers/imap/hooks/imap.py
+++ b/airflow/providers/imap/hooks/imap.py
@@ -63,7 +63,6 @@ class ImapHook(BaseHook):
:return: an authorized ImapHook object.
:rtype: ImapHook
"""
-
if not self.mail_client:
conn = self.get_connection(self.imap_conn_id)
self.mail_client = imaplib.IMAP4_SSL(conn.host)
diff --git a/airflow/providers/jdbc/hooks/jdbc.py
b/airflow/providers/jdbc/hooks/jdbc.py
index 645195c..48086d2 100644
--- a/airflow/providers/jdbc/hooks/jdbc.py
+++ b/airflow/providers/jdbc/hooks/jdbc.py
@@ -75,5 +75,4 @@ class JdbcHook(DbApiHook):
:return: connection autocommit setting.
:rtype: bool
"""
-
return conn.jconn.getAutoCommit()
diff --git a/airflow/providers/microsoft/azure/hooks/azure_batch.py
b/airflow/providers/microsoft/azure/hooks/azure_batch.py
index ad7e341..720cd5a 100644
--- a/airflow/providers/microsoft/azure/hooks/azure_batch.py
+++ b/airflow/providers/microsoft/azure/hooks/azure_batch.py
@@ -192,7 +192,6 @@ class AzureBatchHook(BaseHook):
:param sku_starts_with: The start name of the sku to search
:type sku_starts_with: str
"""
-
options =
batch_models.AccountListSupportedImagesOptions(filter="verificationType eq
'verified'")
images =
self.connection.account.list_supported_images(account_list_supported_images_options=options)
# pick the latest supported sku
@@ -243,7 +242,6 @@ class AzureBatchHook(BaseHook):
:param display_name: The display name for the job
:type display_name: str
"""
-
job = batch_models.JobAddParameter(
id=job_id,
pool_info=batch_models.PoolInformation(pool_id=pool_id),
diff --git a/airflow/providers/microsoft/azure/hooks/wasb.py
b/airflow/providers/microsoft/azure/hooks/wasb.py
index 07fbe61..f19b3b2 100644
--- a/airflow/providers/microsoft/azure/hooks/wasb.py
+++ b/airflow/providers/microsoft/azure/hooks/wasb.py
@@ -184,7 +184,6 @@ class WasbHook(BaseHook):
`BlockBlobService.create_blob_from_path()` takes.
:type kwargs: object
"""
-
if is_prefix:
blobs_to_delete = [
blob.name for blob in
self.connection.list_blobs(container_name, prefix=blob_name, **kwargs)
diff --git a/airflow/providers/mysql/hooks/mysql.py
b/airflow/providers/mysql/hooks/mysql.py
index 2e8432e..ff882c0 100644
--- a/airflow/providers/mysql/hooks/mysql.py
+++ b/airflow/providers/mysql/hooks/mysql.py
@@ -208,7 +208,6 @@ class MySqlHook(DbApiHook):
:return: The same cell
:rtype: object
"""
-
return cell
def get_iam_token(self, conn: Connection) -> Tuple[str, int]:
diff --git a/airflow/providers/redis/operators/redis_publish.py
b/airflow/providers/redis/operators/redis_publish.py
index fc85201..be8fc50 100644
--- a/airflow/providers/redis/operators/redis_publish.py
+++ b/airflow/providers/redis/operators/redis_publish.py
@@ -52,7 +52,6 @@ class RedisPublishOperator(BaseOperator):
:param context: the context object
:type context: dict
"""
-
redis_hook = RedisHook(redis_conn_id=self.redis_conn_id)
self.log.info('Sending messsage %s to Redis on channel %s',
self.message, self.channel)
diff --git a/airflow/providers/salesforce/hooks/salesforce.py
b/airflow/providers/salesforce/hooks/salesforce.py
index 02d073f..b9d1bad 100644
--- a/airflow/providers/salesforce/hooks/salesforce.py
+++ b/airflow/providers/salesforce/hooks/salesforce.py
@@ -291,7 +291,6 @@ class SalesforceHook(BaseHook):
:return: the dataframe.
:rtype: pandas.Dataframe
"""
-
# this line right here will convert all integers to floats
# if there are any None/np.nan values in the column
# that's because None/np.nan cannot exist in an integer column
diff --git a/airflow/providers/sendgrid/utils/emailer.py
b/airflow/providers/sendgrid/utils/emailer.py
index 64c2114..791627d 100644
--- a/airflow/providers/sendgrid/utils/emailer.py
+++ b/airflow/providers/sendgrid/utils/emailer.py
@@ -61,7 +61,6 @@ def send_email(
.. note::
For more information, see :ref:`email-configuration-sendgrid`
"""
-
if files is None:
files = []
diff --git a/airflow/providers/slack/hooks/slack.py
b/airflow/providers/slack/hooks/slack.py
index 8f349aa..7d53f6c 100644
--- a/airflow/providers/slack/hooks/slack.py
+++ b/airflow/providers/slack/hooks/slack.py
@@ -100,5 +100,4 @@ class SlackHook(BaseHook): # noqa
:param json: JSON for the body to attach to the request. Optional.
:type json: dict
"""
-
self.client.api_call(api_method, *args, **kwargs)
diff --git a/airflow/providers/ssh/hooks/ssh.py
b/airflow/providers/ssh/hooks/ssh.py
index 03948b8..0be2431 100644
--- a/airflow/providers/ssh/hooks/ssh.py
+++ b/airflow/providers/ssh/hooks/ssh.py
@@ -169,7 +169,6 @@ class SSHHook(BaseHook):
:rtype: paramiko.client.SSHClient
"""
-
self.log.debug('Creating SSH client for conn_id: %s', self.ssh_conn_id)
client = paramiko.SSHClient()
@@ -240,7 +239,6 @@ class SSHHook(BaseHook):
:return: sshtunnel.SSHTunnelForwarder object
"""
-
if local_port:
local_bind_address = ('localhost', local_port)
else:
diff --git a/airflow/serialization/serialized_objects.py
b/airflow/serialization/serialized_objects.py
index 60d63ad..56c104a 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -129,7 +129,6 @@ class BaseSerialization:
@classmethod
def _is_excluded(cls, var: Any, attrname: str, instance: Any) -> bool:
"""Types excluded from serialization."""
-
if var is None:
if not cls._is_constructor_param(attrname, instance):
# Any instance attribute, that is not a constructor argument,
we exclude None as the default
diff --git a/airflow/settings.py b/airflow/settings.py
index f6a9c4a..1e5ca06 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -285,7 +285,6 @@ def prepare_syspath():
"""
Ensures that certain subfolders of AIRFLOW_HOME are on the classpath
"""
-
if DAGS_FOLDER not in sys.path:
sys.path.append(DAGS_FOLDER)
diff --git a/airflow/ti_deps/deps/trigger_rule_dep.py
b/airflow/ti_deps/deps/trigger_rule_dep.py
index e80c6d3..237a26f 100644
--- a/airflow/ti_deps/deps/trigger_rule_dep.py
+++ b/airflow/ti_deps/deps/trigger_rule_dep.py
@@ -110,7 +110,6 @@ class TriggerRuleDep(BaseTIDep):
:param session: database session
:type session: sqlalchemy.orm.session.Session
"""
-
task = ti.task
upstream = len(task.upstream_task_ids)
trigger_rule = task.trigger_rule
diff --git a/airflow/utils/cli.py b/airflow/utils/cli.py
index 6794d0f..dc0dbeb 100644
--- a/airflow/utils/cli.py
+++ b/airflow/utils/cli.py
@@ -105,7 +105,6 @@ def _build_metrics(func_name, namespace):
:param namespace: Namespace instance from argparse
:return: dict with metrics
"""
-
metrics = {'sub_command': func_name, 'start_datetime': datetime.utcnow(),
'full_command': '{}'.format(list(sys.argv)), 'user':
getpass.getuser()}
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index 00cd2bd..9363c6b 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -651,7 +651,6 @@ class DagFileProcessorManager(LoggingMixin): # pylint:
disable=too-many-instanc
we can get parallelism and isolation from potentially harmful
user code.
"""
-
self.register_exit_signals()
# Start a new process group
@@ -856,7 +855,6 @@ class DagFileProcessorManager(LoggingMixin): # pylint:
disable=too-many-instanc
:type known_file_paths: list[unicode]
:return: None
"""
-
# File Path: Path to the file containing the DAG definition
# PID: PID associated with the process that's processing the file. May
# be empty.
@@ -1263,7 +1261,6 @@ class DagFileProcessorManager(LoggingMixin): # pylint:
disable=too-many-instanc
This is called once every time around the parsing "loop" - i.e. after
all files have been parsed.
"""
-
parse_time = (timezone.utcnow() -
self._parsing_start_time).total_seconds()
Stats.gauge('dag_processing.total_parse_time', parse_time)
Stats.gauge('dagbag_size', sum(stat.num_dags for stat in
self._file_stats.values()))
diff --git a/airflow/utils/dates.py b/airflow/utils/dates.py
index 23b21cd..25a481d 100644
--- a/airflow/utils/dates.py
+++ b/airflow/utils/dates.py
@@ -147,7 +147,6 @@ def round_time(dt, delta,
start_date=timezone.make_aware(datetime.min)):
>>> round_time(datetime(2015, 9, 13, 0, 0), timedelta(1), datetime(2015,
9, 14, 0, 0))
datetime.datetime(2015, 9, 14, 0, 0)
"""
-
if isinstance(delta, str):
# It's cron based, so it's easy
time_zone = start_date.tzinfo
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index b0b2426..4943edd 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -625,7 +625,6 @@ def resetdb():
"""
Clear out the database
"""
-
log.info("Dropping tables that exist")
connection = settings.engine.connect()
diff --git a/airflow/utils/decorators.py b/airflow/utils/decorators.py
index e2da083..af743e4 100644
--- a/airflow/utils/decorators.py
+++ b/airflow/utils/decorators.py
@@ -40,7 +40,6 @@ def apply_defaults(func: T) -> T:
inheritance and argument defaults, this decorator also alerts with
specific information about the missing arguments.
"""
-
# Cache inspect.signature for the wrapper closure to avoid calling it
# at every decorated invocation. This is separate sig_cache created
# per decoration, i.e. each function decorated using apply_defaults will
diff --git a/airflow/utils/email.py b/airflow/utils/email.py
index 5c1b921..ce332a4 100644
--- a/airflow/utils/email.py
+++ b/airflow/utils/email.py
@@ -153,7 +153,6 @@ def send_mime_email(e_from: str, e_to: List[str], mime_msg:
MIMEMultipart, dryru
"""
Send MIME email.
"""
-
smtp_host = conf.get('smtp', 'SMTP_HOST')
smtp_port = conf.getint('smtp', 'SMTP_PORT')
smtp_starttls = conf.getboolean('smtp', 'SMTP_STARTTLS')
diff --git a/airflow/utils/file.py b/airflow/utils/file.py
index f2a5b9f..d659a9d 100644
--- a/airflow/utils/file.py
+++ b/airflow/utils/file.py
@@ -67,7 +67,6 @@ def correct_maybe_zipped(fileloc):
If the path contains a folder with a .zip suffix, then
the folder is treated as a zip archive and path to zip is returned.
"""
-
_, archive, _ = ZIP_REGEX.search(fileloc).groups()
if archive and zipfile.is_zipfile(archive):
return archive
@@ -99,7 +98,6 @@ def find_path_from_directory(
:return : file path not to be ignored.
"""
-
patterns_by_dir: Dict[str, List[Pattern[str]]] = {}
for root, dirs, files in os.walk(str(base_dir_path), followlinks=True):
@@ -174,7 +172,6 @@ def list_py_file_paths(directory: str,
def find_dag_file_paths(directory: str, file_paths: list, safe_mode: bool):
"""Finds file paths of all DAG files."""
-
for file_path in find_path_from_directory(
directory, ".airflowignore"):
try:
diff --git a/airflow/utils/log/log_reader.py b/airflow/utils/log/log_reader.py
index b54e901..42d03d9 100644
--- a/airflow/utils/log/log_reader.py
+++ b/airflow/utils/log/log_reader.py
@@ -54,7 +54,6 @@ class TaskLogReader:
contain information about the task log which can enable you read logs
to the
end.
"""
-
logs, metadatas = self.log_handler.read(ti, try_number,
metadata=metadata)
metadata = metadatas[0]
return logs, metadata
@@ -72,7 +71,6 @@ class TaskLogReader:
:type metadata: dict
:rtype: Iterator[str]
"""
-
if try_number is None:
next_try = ti.next_try_number
try_numbers = list(range(1, next_try))
@@ -90,7 +88,6 @@ class TaskLogReader:
@cached_property
def log_handler(self):
"""Log handler, which is configured to read logs."""
-
logger = logging.getLogger('airflow.task')
task_log_reader = conf.get('logging', 'task_log_reader')
handler = next((handler for handler in logger.handlers if handler.name
== task_log_reader), None)
@@ -99,7 +96,6 @@ class TaskLogReader:
@property
def supports_read(self):
"""Checks if a read operation is supported by a current log handler."""
-
return hasattr(self.log_handler, 'read')
@property
@@ -117,7 +113,6 @@ class TaskLogReader:
:type try_number: Optional[int]
:rtype: str
"""
-
filename_template = conf.get('logging', 'LOG_FILENAME_TEMPLATE')
attachment_filename = render_log_filename(
ti=ti,
diff --git a/airflow/utils/timezone.py b/airflow/utils/timezone.py
index b5c9ccb..95a555f 100644
--- a/airflow/utils/timezone.py
+++ b/airflow/utils/timezone.py
@@ -55,7 +55,6 @@ def utcnow() -> dt.datetime:
:return:
"""
-
# pendulum utcnow() is not used as that sets a TimezoneInfo object
# instead of a Timezone. This is not pickable and also creates issues
# when using replace()
@@ -71,7 +70,6 @@ def utc_epoch() -> dt.datetime:
:return:
"""
-
# pendulum utcnow() is not used as that sets a TimezoneInfo object
# instead of a Timezone. This is not pickable and also creates issues
# when using replace()
diff --git a/airflow/www/api/experimental/endpoints.py
b/airflow/www/api/experimental/endpoints.py
index 6805ed7..f7534c3 100644
--- a/airflow/www/api/experimental/endpoints.py
+++ b/airflow/www/api/experimental/endpoints.py
@@ -219,7 +219,6 @@ def task_info(dag_id, task_id):
@requires_authentication
def dag_paused(dag_id, paused):
"""(Un)pauses a dag"""
-
is_paused = bool(paused == 'true')
models.DagModel.get_dagmodel(dag_id).set_is_paused(
@@ -233,7 +232,6 @@ def dag_paused(dag_id, paused):
@requires_authentication
def dag_is_paused(dag_id):
"""Get paused state of a dag"""
-
is_paused = models.DagModel.get_dagmodel(dag_id).is_paused
return jsonify({'is_paused': is_paused})
@@ -250,7 +248,6 @@ def task_instance_info(dag_id, execution_date, task_id):
"YYYY-mm-DDTHH:MM:SS", for example: "2016-11-16T11:34:15". This will
of course need to have been encoded for URL in the request.
"""
-
# Convert string datetime into actual datetime
try:
execution_date = timezone.parse(execution_date)
@@ -291,7 +288,6 @@ def dag_run_status(dag_id, execution_date):
"YYYY-mm-DDTHH:MM:SS", for example: "2016-11-16T11:34:15". This will
of course need to have been encoded for URL in the request.
"""
-
# Convert string datetime into actual datetime
try:
execution_date = timezone.parse(execution_date)
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index 555b31d..6dc6941 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -96,7 +96,6 @@ def generate_pages(current_page,
:param window: the number of pages to be shown in the paging component (7
default)
:return: the HTML string of the paging component
"""
-
void_link = 'javascript:void(0)'
first_node = Markup("""<li class="paginate_button {disabled}"
id="dags_first">
<a href="{href_link}" aria-controls="dags" data-dt-idx="0"
tabindex="0">«</a>
diff --git a/airflow/www/views.py b/airflow/www/views.py
index e009c94..ecd6b2f 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -232,7 +232,6 @@ def dag_edges(dag):
upstream_join_id >> task5
upstream_join_id >> task6
"""
-
# Edges to add between TaskGroup
edges_to_add = set()
# Edges to remove between individual tasks that are replaced by
edges_to_add.
@@ -370,7 +369,6 @@ class Airflow(AirflowBaseView): # noqa: D101 pylint:
disable=too-many-public-m
An endpoint helping check the health status of the Airflow instance,
including metadatabase and scheduler.
"""
-
payload = {
'metadatabase': {'status': 'unhealthy'}
}