This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch v2-0-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 95889128afec4ebb8a3dfe8a8e13a7264d1a3bd6 Author: Joshua Carp <[email protected]> AuthorDate: Sat Jan 23 00:19:38 2021 -0500 Switch to f-strings using flynt. (#13732) (cherry picked from commit a9ac2b040b64de1aa5d9c2b9def33334e36a8d22) --- .pre-commit-config.yaml | 7 ++++ BREEZE.rst | 2 +- STATIC_CODE_CHECKS.rst | 2 + airflow/api/common/experimental/get_code.py | 2 +- airflow/api/common/experimental/pool.py | 6 +-- .../api_connexion/endpoints/connection_endpoint.py | 2 +- airflow/cli/commands/dag_command.py | 2 +- airflow/cli/commands/task_command.py | 11 ++---- airflow/cli/commands/user_command.py | 8 ++-- airflow/cli/commands/variable_command.py | 2 +- airflow/configuration.py | 2 +- .../example_passing_params_via_test_command.py | 6 +-- airflow/example_dags/example_trigger_target_dag.py | 2 +- airflow/example_dags/subdags/subdag.py | 2 +- airflow/example_dags/tutorial_taskflow_api_etl.py | 2 +- airflow/hooks/dbapi.py | 2 +- airflow/kubernetes/refresh_config.py | 2 +- airflow/models/connection.py | 6 +-- airflow/models/dag.py | 2 +- airflow/models/taskinstance.py | 4 +- airflow/models/xcom.py | 4 +- airflow/operators/sql.py | 14 +++---- airflow/providers/amazon/aws/hooks/datasync.py | 4 +- airflow/providers/amazon/aws/hooks/dynamodb.py | 4 +- airflow/providers/amazon/aws/hooks/sagemaker.py | 4 +- .../providers/amazon/aws/log/s3_task_handler.py | 2 +- airflow/providers/amazon/aws/operators/datasync.py | 2 +- .../amazon/aws/operators/emr_add_steps.py | 2 +- .../amazon/aws/operators/emr_create_job_flow.py | 2 +- .../amazon/aws/operators/emr_modify_cluster.py | 2 +- .../amazon/aws/operators/emr_terminate_job_flow.py | 2 +- .../amazon/aws/operators/sagemaker_endpoint.py | 2 +- .../aws/operators/sagemaker_endpoint_config.py | 2 +- .../amazon/aws/operators/sagemaker_model.py | 2 +- .../amazon/aws/operators/sagemaker_processing.py | 2 +- .../amazon/aws/operators/sagemaker_training.py | 2 +- .../amazon/aws/operators/sagemaker_transform.py | 2 +- .../amazon/aws/operators/sagemaker_tuning.py | 2 +- .../providers/amazon/aws/sensors/sagemaker_base.py | 2 +- airflow/providers/apache/druid/hooks/druid.py | 4 +- airflow/providers/apache/hdfs/sensors/hdfs.py | 2 +- airflow/providers/apache/hive/hooks/hive.py | 8 +--- .../providers/apache/hive/operators/hive_stats.py | 24 +++++------- airflow/providers/apache/spark/hooks/spark_jdbc.py | 2 +- .../providers/apache/spark/hooks/spark_submit.py | 8 ++-- .../providers/cncf/kubernetes/hooks/kubernetes.py | 6 +-- .../cncf/kubernetes/operators/kubernetes_pod.py | 3 +- .../cncf/kubernetes/sensors/spark_kubernetes.py | 2 +- airflow/providers/databricks/hooks/databricks.py | 2 +- airflow/providers/docker/operators/docker_swarm.py | 2 +- airflow/providers/ftp/hooks/ftp.py | 4 +- airflow/providers/google/cloud/hooks/bigquery.py | 4 +- airflow/providers/google/cloud/hooks/compute.py | 20 +++------- airflow/providers/google/cloud/hooks/dataflow.py | 12 +++--- airflow/providers/google/cloud/hooks/functions.py | 4 +- .../google/cloud/hooks/kubernetes_engine.py | 2 +- .../providers/google/cloud/log/gcs_task_handler.py | 4 +- .../providers/google/cloud/operators/bigquery.py | 4 +- .../providers/google/cloud/operators/dataproc.py | 2 +- .../jenkins/operators/jenkins_job_trigger.py | 6 +-- airflow/providers/jira/hooks/jira.py | 4 +- airflow/providers/jira/operators/jira.py | 4 +- .../azure/operators/azure_container_instances.py | 2 +- .../providers/microsoft/winrm/operators/winrm.py | 2 +- airflow/providers/mysql/hooks/mysql.py | 21 +++------- airflow/providers/opsgenie/hooks/opsgenie_alert.py | 2 +- airflow/providers/oracle/hooks/oracle.py | 2 +- airflow/providers/pagerduty/hooks/pagerduty.py | 2 +- airflow/providers/plexus/operators/job.py | 4 +- airflow/providers/postgres/hooks/postgres.py | 2 +- airflow/providers/qubole/hooks/qubole.py | 4 +- airflow/providers/salesforce/hooks/salesforce.py | 2 +- airflow/providers/sftp/operators/sftp.py | 2 +- airflow/providers/ssh/operators/ssh.py | 2 +- airflow/security/kerberos.py | 4 +- airflow/security/utils.py | 2 +- airflow/sensors/date_time.py | 2 +- airflow/sensors/sql.py | 2 +- airflow/utils/cli.py | 2 +- airflow/utils/code_utils.py | 2 +- airflow/utils/log/file_task_handler.py | 6 +-- airflow/utils/timezone.py | 2 +- airflow/www/utils.py | 10 ++--- airflow/www/validators.py | 4 +- airflow/www/views.py | 19 +++------ breeze-complete | 1 + docs/exts/airflow_intersphinx.py | 2 +- docs/exts/exampleinclude.py | 2 +- docs/exts/redirects.py | 2 +- metastore_browser/hive_metastore.py | 14 +++---- .../pre_commit_check_provider_yaml_files.py | 8 ++-- scripts/ci/pre_commit/pre_commit_yaml_to_cfg.py | 6 +-- tests/api/common/experimental/test_pool.py | 2 +- tests/cli/commands/test_connection_command.py | 14 +++---- tests/core/test_core.py | 2 +- tests/dags/test_subdag.py | 2 +- tests/dags_corrupted/test_impersonation_custom.py | 2 +- tests/executors/test_celery_executor.py | 2 +- tests/hooks/test_dbapi.py | 2 +- tests/models/test_baseoperator.py | 4 +- tests/models/test_connection.py | 2 +- tests/models/test_dag.py | 4 +- tests/models/test_dagbag.py | 6 +-- tests/models/test_renderedtifields.py | 2 +- .../amazon/aws/hooks/test_batch_waiters.py | 2 +- tests/providers/amazon/aws/hooks/test_s3.py | 4 +- .../apache/hive/operators/test_hive_stats.py | 14 +++---- .../apache/hive/transfers/test_mysql_to_hive.py | 24 +++++------- .../providers/apache/spark/hooks/test_spark_sql.py | 10 ++--- tests/providers/apache/sqoop/hooks/test_sqoop.py | 45 +++++++++------------- .../elasticsearch/log/elasticmock/__init__.py | 2 +- .../providers/google/cloud/hooks/test_cloud_sql.py | 4 +- tests/providers/google/cloud/hooks/test_pubsub.py | 32 +++++++-------- .../google/cloud/operators/test_dataflow.py | 2 +- .../google/cloud/operators/test_mlengine_utils.py | 6 +-- tests/providers/google/cloud/sensors/test_gcs.py | 2 +- .../cloud/transfers/test_bigquery_to_bigquery.py | 2 +- .../google/cloud/transfers/test_gcs_to_gcs.py | 2 +- .../cloud/utils/test_mlengine_operator_utils.py | 6 +-- tests/providers/mysql/hooks/test_mysql.py | 6 +-- tests/serialization/test_dag_serialization.py | 4 +- tests/test_utils/gcp_system_helpers.py | 2 +- tests/test_utils/logging_command_executor.py | 2 +- tests/test_utils/mock_operators.py | 2 +- tests/utils/test_helpers.py | 4 +- tests/www/api/experimental/test_endpoints.py | 2 +- tests/www/test_views.py | 16 ++++---- 127 files changed, 280 insertions(+), 361 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index aea91e3..0eb96fd 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -609,4 +609,11 @@ repos: entry: "./scripts/ci/pre_commit/pre_commit_in_container_bats_test.sh" files: ^tests/bats/in_container/.*.bats$|^scripts/in_container/.*sh pass_filenames: false + - id: flynt + name: Convert to f-strings with flynt + entry: flynt + language: python + language_version: python3 + additional_dependencies: ['flynt'] + files: \.py$ ## ONLY ADD PRE-COMMITS HERE THAT REQUIRE CI IMAGE diff --git a/BREEZE.rst b/BREEZE.rst index 131f94c..f4689ba 100644 --- a/BREEZE.rst +++ b/BREEZE.rst @@ -2232,7 +2232,7 @@ This is the current syntax for `./breeze <./breeze>`_: check-executables-have-shebangs check-hooks-apply check-integrations check-merge-conflict check-xml consistent-pylint daysago-import-check debug-statements detect-private-key doctoc dont-use-safe-filter end-of-file-fixer - fix-encoding-pragma flake8 forbid-tabs helm-lint identity + fix-encoding-pragma flake8 flynt forbid-tabs helm-lint identity incorrect-use-of-LoggingMixin insert-license isort json-schema language-matters lint-dockerfile lint-openapi markdownlint mermaid mixed-line-ending mypy mypy-helm no-providers-in-core-examples no-relative-imports pre-commit-descriptions diff --git a/STATIC_CODE_CHECKS.rst b/STATIC_CODE_CHECKS.rst index 55674bf..1c28dd9 100644 --- a/STATIC_CODE_CHECKS.rst +++ b/STATIC_CODE_CHECKS.rst @@ -102,6 +102,8 @@ require Breeze Docker images to be installed locally: ----------------------------------- ---------------------------------------------------------------- ------------ ``flake8`` Runs flake8. * ----------------------------------- ---------------------------------------------------------------- ------------ +``flynt`` Runs flynt. +----------------------------------- ---------------------------------------------------------------- ------------ ``forbid-tabs`` Fails if tabs are used in the project. ----------------------------------- ---------------------------------------------------------------- ------------ ``helm-lint`` Verifies if helm lint passes for the chart diff --git a/airflow/api/common/experimental/get_code.py b/airflow/api/common/experimental/get_code.py index 99f248b..79b0b9f 100644 --- a/airflow/api/common/experimental/get_code.py +++ b/airflow/api/common/experimental/get_code.py @@ -32,5 +32,5 @@ def get_code(dag_id: str) -> str: try: return DagCode.get_code_by_fileloc(dag.fileloc) except (OSError, DagCodeNotFound) as exception: - error_message = "Error {} while reading Dag id {} Code".format(str(exception), dag_id) + error_message = f"Error {str(exception)} while reading Dag id {dag_id} Code" raise AirflowException(error_message, exception) diff --git a/airflow/api/common/experimental/pool.py b/airflow/api/common/experimental/pool.py index 0f1d1c7..30950ea 100644 --- a/airflow/api/common/experimental/pool.py +++ b/airflow/api/common/experimental/pool.py @@ -29,7 +29,7 @@ def get_pool(name, session=None): pool = session.query(Pool).filter_by(pool=name).first() if pool is None: - raise PoolNotFound("Pool '%s' doesn't exist" % name) + raise PoolNotFound(f"Pool '{name}' doesn't exist") return pool @@ -49,7 +49,7 @@ def create_pool(name, slots, description, session=None): try: slots = int(slots) except ValueError: - raise AirflowBadRequest("Bad value for `slots`: %s" % slots) + raise AirflowBadRequest(f"Bad value for `slots`: {slots}") # Get the length of the pool column pool_name_length = Pool.pool.property.columns[0].type.length @@ -81,7 +81,7 @@ def delete_pool(name, session=None): pool = session.query(Pool).filter_by(pool=name).first() if pool is None: - raise PoolNotFound("Pool '%s' doesn't exist" % name) + raise PoolNotFound(f"Pool '{name}' doesn't exist") session.delete(pool) session.commit() diff --git a/airflow/api_connexion/endpoints/connection_endpoint.py b/airflow/api_connexion/endpoints/connection_endpoint.py index ecee686..df3bd41 100644 --- a/airflow/api_connexion/endpoints/connection_endpoint.py +++ b/airflow/api_connexion/endpoints/connection_endpoint.py @@ -124,4 +124,4 @@ def post_connection(session): session.add(connection) session.commit() return connection_schema.dump(connection) - raise AlreadyExists(detail="Connection already exist. ID: %s" % conn_id) + raise AlreadyExists(detail=f"Connection already exist. ID: {conn_id}") diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py index 40f8834..9b050ec 100644 --- a/airflow/cli/commands/dag_command.py +++ b/airflow/cli/commands/dag_command.py @@ -169,7 +169,7 @@ def set_is_paused(is_paused, args): dag.set_is_paused(is_paused=is_paused) - print("Dag: {}, paused: {}".format(args.dag_id, str(is_paused))) + print(f"Dag: {args.dag_id}, paused: {is_paused}") def dag_show(args): diff --git a/airflow/cli/commands/task_command.py b/airflow/cli/commands/task_command.py index c2794d4..fac4c26 100644 --- a/airflow/cli/commands/task_command.py +++ b/airflow/cli/commands/task_command.py @@ -406,14 +406,11 @@ def task_render(args): for attr in task.__class__.template_fields: print( textwrap.dedent( - """\ + f""" # ---------------------------------------------------------- + # property: {attr} # ---------------------------------------------------------- - # property: {} - # ---------------------------------------------------------- - {} - """.format( - attr, getattr(task, attr) - ) + {getattr(task, attr)} + """ ) ) diff --git a/airflow/cli/commands/user_command.py b/airflow/cli/commands/user_command.py index 3fd80bd..1274745 100644 --- a/airflow/cli/commands/user_command.py +++ b/airflow/cli/commands/user_command.py @@ -98,7 +98,7 @@ def users_manage_role(args, remove=False): appbuilder = cached_app().appbuilder # pylint: disable=no-member user = appbuilder.sm.find_user(username=args.username) or appbuilder.sm.find_user(email=args.email) if not user: - raise SystemExit('User "{}" does not exist'.format(args.username or args.email)) + raise SystemExit(f'User "{args.username or args.email}" does not exist') role = appbuilder.sm.find_role(args.role) if not role: @@ -144,7 +144,7 @@ def users_export(args): with open(args.export, 'w') as file: file.write(json.dumps(users, sort_keys=True, indent=4)) - print("{} users successfully exported to {}".format(len(users), file.name)) + print(f"{len(users)} users successfully exported to {file.name}") @cli_utils.action_logging @@ -191,7 +191,7 @@ def _import_users(users_list): # pylint: disable=redefined-outer-name existing_user = appbuilder.sm.find_user(email=user['email']) if existing_user: - print("Found existing user with email '{}'".format(user['email'])) + print(f"Found existing user with email '{user['email']}'") existing_user.roles = roles existing_user.first_name = user['firstname'] existing_user.last_name = user['lastname'] @@ -206,7 +206,7 @@ def _import_users(users_list): # pylint: disable=redefined-outer-name appbuilder.sm.update_user(existing_user) users_updated.append(user['email']) else: - print("Creating new user with email '{}'".format(user['email'])) + print(f"Creating new user with email '{user['email']}'") appbuilder.sm.add_user( username=user['username'], first_name=user['firstname'], diff --git a/airflow/cli/commands/variable_command.py b/airflow/cli/commands/variable_command.py index 55cf94b..526f094 100644 --- a/airflow/cli/commands/variable_command.py +++ b/airflow/cli/commands/variable_command.py @@ -92,7 +92,7 @@ def _import_helper(filepath): try: Variable.set(k, v, serialize_json=not isinstance(v, str)) except Exception as e: # pylint: disable=broad-except - print('Variable import failed: {}'.format(repr(e))) + print(f'Variable import failed: {repr(e)}') fail_count += 1 else: suc_count += 1 diff --git a/airflow/configuration.py b/airflow/configuration.py index ecd2bc6..5b765de 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -536,7 +536,7 @@ class AirflowConfigParser(ConfigParser): # pylint: disable=too-many-ancestors # This is based on the configparser.RawConfigParser.write method code to add support for # reading options from environment variables. if space_around_delimiters: - delimiter = " {} ".format(self._delimiters[0]) + delimiter = f" {self._delimiters[0]} " else: delimiter = self._delimiters[0] if self._defaults: diff --git a/airflow/example_dags/example_passing_params_via_test_command.py b/airflow/example_dags/example_passing_params_via_test_command.py index 8eaadd7..456def2 100644 --- a/airflow/example_dags/example_passing_params_via_test_command.py +++ b/airflow/example_dags/example_passing_params_via_test_command.py @@ -52,7 +52,7 @@ def my_py_command(test_mode, params): ) ) # Print out the value of "miff", passed in below via the Python Operator - print(" 'miff' was passed in via task params = {}".format(params["miff"])) + print(f" 'miff' was passed in via task params = {params['miff']}") return 1 @@ -83,8 +83,8 @@ def print_env_vars(test_mode): --env-vars '{"foo":"bar"}'` """ if test_mode: - print("foo={}".format(os.environ.get('foo'))) - print("AIRFLOW_TEST_MODE={}".format(os.environ.get('AIRFLOW_TEST_MODE'))) + print(f"foo={os.environ.get('foo')}") + print(f"AIRFLOW_TEST_MODE={os.environ.get('AIRFLOW_TEST_MODE')}") env_var_test_task = PythonOperator(task_id='env_var_test_task', python_callable=print_env_vars, dag=dag) diff --git a/airflow/example_dags/example_trigger_target_dag.py b/airflow/example_dags/example_trigger_target_dag.py index 0355275..f431dc4 100644 --- a/airflow/example_dags/example_trigger_target_dag.py +++ b/airflow/example_dags/example_trigger_target_dag.py @@ -43,7 +43,7 @@ def run_this_func(**context): :param context: The execution context :type context: dict """ - print("Remotely received value of {} for key=message".format(context["dag_run"].conf["message"])) + print(f"Remotely received value of {context['dag_run'].conf['message']} for key=message") run_this = PythonOperator(task_id="run_this", python_callable=run_this_func, dag=dag) diff --git a/airflow/example_dags/subdags/subdag.py b/airflow/example_dags/subdags/subdag.py index 6a30415..849b294 100644 --- a/airflow/example_dags/subdags/subdag.py +++ b/airflow/example_dags/subdags/subdag.py @@ -43,7 +43,7 @@ def subdag(parent_dag_name, child_dag_name, args): for i in range(5): DummyOperator( - task_id='{}-task-{}'.format(child_dag_name, i + 1), + task_id=f'{child_dag_name}-task-{i + 1}', default_args=args, dag=dag_subdag, ) diff --git a/airflow/example_dags/tutorial_taskflow_api_etl.py b/airflow/example_dags/tutorial_taskflow_api_etl.py index e50ae5f..cfcfbd9 100644 --- a/airflow/example_dags/tutorial_taskflow_api_etl.py +++ b/airflow/example_dags/tutorial_taskflow_api_etl.py @@ -91,7 +91,7 @@ def tutorial_taskflow_api_etl(): instead of saving it to end user review, just prints it out. """ - print("Total order value is: %.2f" % total_order_value) + print(f"Total order value is: {total_order_value:.2f}") # [END load] diff --git a/airflow/hooks/dbapi.py b/airflow/hooks/dbapi.py index 9e340e4..9821643 100644 --- a/airflow/hooks/dbapi.py +++ b/airflow/hooks/dbapi.py @@ -248,7 +248,7 @@ class DbApiHook(BaseHook): sql = "INSERT INTO " else: sql = "REPLACE INTO " - sql += "{} {} VALUES ({})".format(table, target_fields, ",".join(placeholders)) + sql += f"{table} {target_fields} VALUES ({','.join(placeholders)})" return sql def insert_rows(self, table, rows, target_fields=None, commit_every=1000, replace=False, **kwargs): diff --git a/airflow/kubernetes/refresh_config.py b/airflow/kubernetes/refresh_config.py index 0004cac..023cd32 100644 --- a/airflow/kubernetes/refresh_config.py +++ b/airflow/kubernetes/refresh_config.py @@ -61,7 +61,7 @@ class RefreshKubeConfigLoader(KubeConfigLoader): if 'token' not in status: logging.error('exec: missing token field in plugin output') return None - self.token = "Bearer %s" % status['token'] # pylint: disable=W0201 + self.token = f"Bearer {status['token']}" # pylint: disable=W0201 ts_str = status.get('expirationTimestamp') if ts_str: self.api_key_expire_ts = _parse_timestamp(ts_str) diff --git a/airflow/models/connection.py b/airflow/models/connection.py index 1159a44..4edd6b7 100644 --- a/airflow/models/connection.py +++ b/airflow/models/connection.py @@ -165,7 +165,7 @@ class Connection(Base, LoggingMixin): # pylint: disable=too-many-instance-attri def get_uri(self) -> str: """Return connection in URI format""" - uri = '{}://'.format(str(self.conn_type).lower().replace('_', '-')) + uri = f"{str(self.conn_type).lower().replace('_', '-')}://" authority_block = '' if self.login is not None: @@ -190,12 +190,12 @@ class Connection(Base, LoggingMixin): # pylint: disable=too-many-instance-attri host_block += f'@:{self.port}' if self.schema: - host_block += '/{}'.format(quote(self.schema, safe='')) + host_block += f"/{quote(self.schema, safe='')}" uri += host_block if self.extra_dejson: - uri += '?{}'.format(urlencode(self.extra_dejson)) + uri += f'?{urlencode(self.extra_dejson)}' return uri diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 8bb32db..15332f3 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -1538,7 +1538,7 @@ class DAG(LoggingMixin): dttm = timezone.utcnow() pickled = pickle.dumps(self) d['pickle_len'] = len(pickled) - d['pickling_duration'] = "{}".format(timezone.utcnow() - dttm) + d['pickling_duration'] = str(timezone.utcnow() - dttm) except Exception as e: self.log.debug(e) d['is_picklable'] = False diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index d671a01..3c9f53a 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -1595,9 +1595,7 @@ class TaskInstance(Base, LoggingMixin): # pylint: disable=R0902,R0904 yesterday_ds_nodash = yesterday_ds.replace('-', '') tomorrow_ds_nodash = tomorrow_ds.replace('-', '') - ti_key_str = "{dag_id}__{task_id}__{ds_nodash}".format( - dag_id=task.dag_id, task_id=task.task_id, ds_nodash=ds_nodash - ) + ti_key_str = f"{task.dag_id}__{task.task_id}__{ds_nodash}" if task.params: params.update(task.params) diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py index 6428a40..844e38f 100644 --- a/airflow/models/xcom.py +++ b/airflow/models/xcom.py @@ -71,9 +71,7 @@ class BaseXCom(Base, LoggingMixin): self.value = pickle.loads(self.value) def __repr__(self): - return '<XCom "{key}" ({task_id} @ {execution_date})>'.format( - key=self.key, task_id=self.task_id, execution_date=self.execution_date - ) + return f'<XCom "{self.key}" ({self.task_id} @ {self.execution_date})>' @classmethod @provide_session diff --git a/airflow/operators/sql.py b/airflow/operators/sql.py index 59c2e60..00f7e13 100644 --- a/airflow/operators/sql.py +++ b/airflow/operators/sql.py @@ -293,9 +293,7 @@ class SQLIntervalCheckOperator(BaseOperator): self.days_back = -abs(days_back) self.conn_id = conn_id sqlexp = ", ".join(self.metrics_sorted) - sqlt = "SELECT {sqlexp} FROM {table} WHERE {date_filter_column}=".format( - sqlexp=sqlexp, table=table, date_filter_column=date_filter_column - ) + sqlt = f"SELECT {sqlexp} FROM {table} WHERE {date_filter_column}=" self.sql1 = sqlt + "'{{ ds }}'" self.sql2 = sqlt + "'{{ macros.ds_add(ds, " + str(self.days_back) + ") }}'" @@ -360,9 +358,7 @@ class SQLIntervalCheckOperator(BaseOperator): ratios[k], self.metrics_thresholds[k], ) - raise AirflowException( - "The following tests have failed:\n {}".format(", ".join(sorted(failed_tests))) - ) + raise AirflowException(f"The following tests have failed:\n {', '.join(sorted(failed_tests))}") self.log.info("All tests have passed") @@ -535,7 +531,7 @@ class BranchSQLOperator(BaseOperator, SkipMixin): self._hook = self._get_hook() if self._hook is None: - raise AirflowException("Failed to establish connection to '%s'" % self.conn_id) + raise AirflowException(f"Failed to establish connection to '{self.conn_id}'") if self.sql is None: raise AirflowException("Expected 'sql' parameter is missing.") @@ -584,14 +580,14 @@ class BranchSQLOperator(BaseOperator, SkipMixin): follow_branch = self.follow_task_ids_if_true else: raise AirflowException( - "Unexpected query return result '{}' type '{}'".format(query_result, type(query_result)) + f"Unexpected query return result '{query_result}' type '{type(query_result)}'" ) if follow_branch is None: follow_branch = self.follow_task_ids_if_false except ValueError: raise AirflowException( - "Unexpected query return result '{}' type '{}'".format(query_result, type(query_result)) + f"Unexpected query return result '{query_result}' type '{type(query_result)}'" ) self.skip_all_except(context["ti"], follow_branch) diff --git a/airflow/providers/amazon/aws/hooks/datasync.py b/airflow/providers/amazon/aws/hooks/datasync.py index 4925529..3c82749 100644 --- a/airflow/providers/amazon/aws/hooks/datasync.py +++ b/airflow/providers/amazon/aws/hooks/datasync.py @@ -57,7 +57,7 @@ class AWSDataSyncHook(AwsBaseHook): self.tasks: list = [] # wait_interval_seconds = 0 is used during unit tests if wait_interval_seconds < 0 or wait_interval_seconds > 15 * 60: - raise ValueError("Invalid wait_interval_seconds %s" % wait_interval_seconds) + raise ValueError(f"Invalid wait_interval_seconds {wait_interval_seconds}") self.wait_interval_seconds = wait_interval_seconds def create_location(self, location_uri: str, **create_location_kwargs) -> str: @@ -314,4 +314,4 @@ class AWSDataSyncHook(AwsBaseHook): return False if iterations <= 0: raise AirflowTaskTimeout("Max iterations exceeded!") - raise AirflowException("Unknown status: %s" % status) # Should never happen + raise AirflowException(f"Unknown status: {status}") # Should never happen diff --git a/airflow/providers/amazon/aws/hooks/dynamodb.py b/airflow/providers/amazon/aws/hooks/dynamodb.py index a829f8d..a66f2b0 100644 --- a/airflow/providers/amazon/aws/hooks/dynamodb.py +++ b/airflow/providers/amazon/aws/hooks/dynamodb.py @@ -58,6 +58,4 @@ class AwsDynamoDBHook(AwsBaseHook): batch.put_item(Item=item) return True except Exception as general_error: - raise AirflowException( - "Failed to insert items in dynamodb, error: {error}".format(error=str(general_error)) - ) + raise AirflowException(f"Failed to insert items in dynamodb, error: {str(general_error)}") diff --git a/airflow/providers/amazon/aws/hooks/sagemaker.py b/airflow/providers/amazon/aws/hooks/sagemaker.py index ab5fdd1..d6548ad 100644 --- a/airflow/providers/amazon/aws/hooks/sagemaker.py +++ b/airflow/providers/amazon/aws/hooks/sagemaker.py @@ -126,7 +126,7 @@ def secondary_training_status_message( for transition in transitions_to_print: message = transition['StatusMessage'] time_str = timezone.convert_to_utc(job_description['LastModifiedTime']).strftime('%Y-%m-%d %H:%M:%S') - status_strs.append('{} {} - {}'.format(time_str, transition['Status'], message)) + status_strs.append(f"{time_str} {transition['Status']} - {message}") return '\n'.join(status_strs) @@ -740,7 +740,7 @@ class SageMakerHook(AwsBaseHook): # pylint: disable=too-many-public-methods if status in non_terminal_states: running = True elif status in self.failed_states: - raise AirflowException('SageMaker job failed because %s' % response['FailureReason']) + raise AirflowException(f"SageMaker job failed because {response['FailureReason']}") else: running = False diff --git a/airflow/providers/amazon/aws/log/s3_task_handler.py b/airflow/providers/amazon/aws/log/s3_task_handler.py index d6e5326..7fdeac3 100644 --- a/airflow/providers/amazon/aws/log/s3_task_handler.py +++ b/airflow/providers/amazon/aws/log/s3_task_handler.py @@ -118,7 +118,7 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin): log_exists = self.s3_log_exists(remote_loc) except Exception as error: # pylint: disable=broad-except self.log.exception(error) - log = '*** Failed to verify remote log exists {}.\n{}\n'.format(remote_loc, str(error)) + log = f'*** Failed to verify remote log exists {remote_loc}.\n{str(error)}\n' if log_exists: # If S3 remote file exists, we do not fetch logs from task instance diff --git a/airflow/providers/amazon/aws/operators/datasync.py b/airflow/providers/amazon/aws/operators/datasync.py index fab6898..f5c129a 100644 --- a/airflow/providers/amazon/aws/operators/datasync.py +++ b/airflow/providers/amazon/aws/operators/datasync.py @@ -351,7 +351,7 @@ class AWSDataSyncOperator(BaseOperator): self.log.log(level, '%s=%s', k, v) if not result: - raise AirflowException("Failed TaskExecutionArn %s" % self.task_execution_arn) + raise AirflowException(f"Failed TaskExecutionArn {self.task_execution_arn}") def on_kill(self) -> None: """Cancel the submitted DataSync task.""" diff --git a/airflow/providers/amazon/aws/operators/emr_add_steps.py b/airflow/providers/amazon/aws/operators/emr_add_steps.py index 44bc20c..2ffd5cc 100644 --- a/airflow/providers/amazon/aws/operators/emr_add_steps.py +++ b/airflow/providers/amazon/aws/operators/emr_add_steps.py @@ -100,7 +100,7 @@ class EmrAddStepsOperator(BaseOperator): response = emr.add_job_flow_steps(JobFlowId=job_flow_id, Steps=steps) if not response['ResponseMetadata']['HTTPStatusCode'] == 200: - raise AirflowException('Adding steps failed: %s' % response) + raise AirflowException(f'Adding steps failed: {response}') else: self.log.info('Steps %s added to JobFlow', response['StepIds']) return response['StepIds'] diff --git a/airflow/providers/amazon/aws/operators/emr_create_job_flow.py b/airflow/providers/amazon/aws/operators/emr_create_job_flow.py index b3b6808..d8dc31e 100644 --- a/airflow/providers/amazon/aws/operators/emr_create_job_flow.py +++ b/airflow/providers/amazon/aws/operators/emr_create_job_flow.py @@ -78,7 +78,7 @@ class EmrCreateJobFlowOperator(BaseOperator): response = emr.create_job_flow(job_flow_overrides) if not response['ResponseMetadata']['HTTPStatusCode'] == 200: - raise AirflowException('JobFlow creation failed: %s' % response) + raise AirflowException(f'JobFlow creation failed: {response}') else: self.log.info('JobFlow with id %s created', response['JobFlowId']) return response['JobFlowId'] diff --git a/airflow/providers/amazon/aws/operators/emr_modify_cluster.py b/airflow/providers/amazon/aws/operators/emr_modify_cluster.py index f0e4693..a04e845 100644 --- a/airflow/providers/amazon/aws/operators/emr_modify_cluster.py +++ b/airflow/providers/amazon/aws/operators/emr_modify_cluster.py @@ -66,7 +66,7 @@ class EmrModifyClusterOperator(BaseOperator): ) if response['ResponseMetadata']['HTTPStatusCode'] != 200: - raise AirflowException('Modify cluster failed: %s' % response) + raise AirflowException(f'Modify cluster failed: {response}') else: self.log.info('Steps concurrency level %d', response['StepConcurrencyLevel']) return response['StepConcurrencyLevel'] diff --git a/airflow/providers/amazon/aws/operators/emr_terminate_job_flow.py b/airflow/providers/amazon/aws/operators/emr_terminate_job_flow.py index 0e7e17f..9d75eaf 100644 --- a/airflow/providers/amazon/aws/operators/emr_terminate_job_flow.py +++ b/airflow/providers/amazon/aws/operators/emr_terminate_job_flow.py @@ -51,6 +51,6 @@ class EmrTerminateJobFlowOperator(BaseOperator): response = emr.terminate_job_flows(JobFlowIds=[self.job_flow_id]) if not response['ResponseMetadata']['HTTPStatusCode'] == 200: - raise AirflowException('JobFlow termination failed: %s' % response) + raise AirflowException(f'JobFlow termination failed: {response}') else: self.log.info('JobFlow with id %s terminated', self.job_flow_id) diff --git a/airflow/providers/amazon/aws/operators/sagemaker_endpoint.py b/airflow/providers/amazon/aws/operators/sagemaker_endpoint.py index 53cfd93..35b0b11 100644 --- a/airflow/providers/amazon/aws/operators/sagemaker_endpoint.py +++ b/airflow/providers/amazon/aws/operators/sagemaker_endpoint.py @@ -150,7 +150,7 @@ class SageMakerEndpointOperator(SageMakerBaseOperator): ) if response['ResponseMetadata']['HTTPStatusCode'] != 200: - raise AirflowException('Sagemaker endpoint creation failed: %s' % response) + raise AirflowException(f'Sagemaker endpoint creation failed: {response}') else: return { 'EndpointConfig': self.hook.describe_endpoint_config(endpoint_info['EndpointConfigName']), diff --git a/airflow/providers/amazon/aws/operators/sagemaker_endpoint_config.py b/airflow/providers/amazon/aws/operators/sagemaker_endpoint_config.py index bbf2be1..a2add7b 100644 --- a/airflow/providers/amazon/aws/operators/sagemaker_endpoint_config.py +++ b/airflow/providers/amazon/aws/operators/sagemaker_endpoint_config.py @@ -49,6 +49,6 @@ class SageMakerEndpointConfigOperator(SageMakerBaseOperator): self.log.info('Creating SageMaker Endpoint Config %s.', self.config['EndpointConfigName']) response = self.hook.create_endpoint_config(self.config) if response['ResponseMetadata']['HTTPStatusCode'] != 200: - raise AirflowException('Sagemaker endpoint config creation failed: %s' % response) + raise AirflowException(f'Sagemaker endpoint config creation failed: {response}') else: return {'EndpointConfig': self.hook.describe_endpoint_config(self.config['EndpointConfigName'])} diff --git a/airflow/providers/amazon/aws/operators/sagemaker_model.py b/airflow/providers/amazon/aws/operators/sagemaker_model.py index 25730ea..0e8cbf4 100644 --- a/airflow/providers/amazon/aws/operators/sagemaker_model.py +++ b/airflow/providers/amazon/aws/operators/sagemaker_model.py @@ -53,6 +53,6 @@ class SageMakerModelOperator(SageMakerBaseOperator): self.log.info('Creating SageMaker Model %s.', self.config['ModelName']) response = self.hook.create_model(self.config) if response['ResponseMetadata']['HTTPStatusCode'] != 200: - raise AirflowException('Sagemaker model creation failed: %s' % response) + raise AirflowException(f'Sagemaker model creation failed: {response}') else: return {'Model': self.hook.describe_model(self.config['ModelName'])} diff --git a/airflow/providers/amazon/aws/operators/sagemaker_processing.py b/airflow/providers/amazon/aws/operators/sagemaker_processing.py index e56a987..271b46b 100644 --- a/airflow/providers/amazon/aws/operators/sagemaker_processing.py +++ b/airflow/providers/amazon/aws/operators/sagemaker_processing.py @@ -119,5 +119,5 @@ class SageMakerProcessingOperator(SageMakerBaseOperator): max_ingestion_time=self.max_ingestion_time, ) if response['ResponseMetadata']['HTTPStatusCode'] != 200: - raise AirflowException('Sagemaker Processing Job creation failed: %s' % response) + raise AirflowException(f'Sagemaker Processing Job creation failed: {response}') return {'Processing': self.hook.describe_processing_job(self.config['ProcessingJobName'])} diff --git a/airflow/providers/amazon/aws/operators/sagemaker_training.py b/airflow/providers/amazon/aws/operators/sagemaker_training.py index 29c34f6..7d9eaf2 100644 --- a/airflow/providers/amazon/aws/operators/sagemaker_training.py +++ b/airflow/providers/amazon/aws/operators/sagemaker_training.py @@ -117,6 +117,6 @@ class SageMakerTrainingOperator(SageMakerBaseOperator): max_ingestion_time=self.max_ingestion_time, ) if response['ResponseMetadata']['HTTPStatusCode'] != 200: - raise AirflowException('Sagemaker Training Job creation failed: %s' % response) + raise AirflowException(f'Sagemaker Training Job creation failed: {response}') else: return {'Training': self.hook.describe_training_job(self.config['TrainingJobName'])} diff --git a/airflow/providers/amazon/aws/operators/sagemaker_transform.py b/airflow/providers/amazon/aws/operators/sagemaker_transform.py index 7caf9f1..b264d2d 100644 --- a/airflow/providers/amazon/aws/operators/sagemaker_transform.py +++ b/airflow/providers/amazon/aws/operators/sagemaker_transform.py @@ -116,7 +116,7 @@ class SageMakerTransformOperator(SageMakerBaseOperator): max_ingestion_time=self.max_ingestion_time, ) if response['ResponseMetadata']['HTTPStatusCode'] != 200: - raise AirflowException('Sagemaker transform Job creation failed: %s' % response) + raise AirflowException(f'Sagemaker transform Job creation failed: {response}') else: return { 'Model': self.hook.describe_model(transform_config['ModelName']), diff --git a/airflow/providers/amazon/aws/operators/sagemaker_tuning.py b/airflow/providers/amazon/aws/operators/sagemaker_tuning.py index f8df36a..38664a8 100644 --- a/airflow/providers/amazon/aws/operators/sagemaker_tuning.py +++ b/airflow/providers/amazon/aws/operators/sagemaker_tuning.py @@ -92,6 +92,6 @@ class SageMakerTuningOperator(SageMakerBaseOperator): max_ingestion_time=self.max_ingestion_time, ) if response['ResponseMetadata']['HTTPStatusCode'] != 200: - raise AirflowException('Sagemaker Tuning Job creation failed: %s' % response) + raise AirflowException(f'Sagemaker Tuning Job creation failed: {response}') else: return {'Tuning': self.hook.describe_tuning_job(self.config['HyperParameterTuningJobName'])} diff --git a/airflow/providers/amazon/aws/sensors/sagemaker_base.py b/airflow/providers/amazon/aws/sensors/sagemaker_base.py index 16c8cd7..6572122 100644 --- a/airflow/providers/amazon/aws/sensors/sagemaker_base.py +++ b/airflow/providers/amazon/aws/sensors/sagemaker_base.py @@ -63,7 +63,7 @@ class SageMakerBaseSensor(BaseSensorOperator): if state in self.failed_states(): failed_reason = self.get_failed_reason_from_response(response) - raise AirflowException('Sagemaker job failed for the following reason: %s' % failed_reason) + raise AirflowException(f'Sagemaker job failed for the following reason: {failed_reason}') return True def non_terminal_states(self) -> Set[str]: diff --git a/airflow/providers/apache/druid/hooks/druid.py b/airflow/providers/apache/druid/hooks/druid.py index b4af207..69d33a1 100644 --- a/airflow/providers/apache/druid/hooks/druid.py +++ b/airflow/providers/apache/druid/hooks/druid.py @@ -68,9 +68,7 @@ class DruidHook(BaseHook): port = conn.port conn_type = 'http' if not conn.conn_type else conn.conn_type endpoint = conn.extra_dejson.get('endpoint', '') - return "{conn_type}://{host}:{port}/{endpoint}".format( - conn_type=conn_type, host=host, port=port, endpoint=endpoint - ) + return f"{conn_type}://{host}:{port}/{endpoint}" def get_auth(self) -> Optional[requests.auth.HTTPBasicAuth]: """ diff --git a/airflow/providers/apache/hdfs/sensors/hdfs.py b/airflow/providers/apache/hdfs/sensors/hdfs.py index 65cfb5b..867c193 100644 --- a/airflow/providers/apache/hdfs/sensors/hdfs.py +++ b/airflow/providers/apache/hdfs/sensors/hdfs.py @@ -141,7 +141,7 @@ class HdfsRegexSensor(HdfsSensor): result = [ f for f in sb_client.ls([self.filepath], include_toplevel=False) - if f['file_type'] == 'f' and self.regex.match(f['path'].replace('%s/' % self.filepath, '')) + if f['file_type'] == 'f' and self.regex.match(f['path'].replace(f'{self.filepath}/', '')) ] result = self.filter_for_ignored_ext(result, self.ignored_ext, self.ignore_copying) result = self.filter_for_filesize(result, self.file_size) diff --git a/airflow/providers/apache/hive/hooks/hive.py b/airflow/providers/apache/hive/hooks/hive.py index ab7b7b7..41f5cff 100644 --- a/airflow/providers/apache/hive/hooks/hive.py +++ b/airflow/providers/apache/hive/hooks/hive.py @@ -133,9 +133,7 @@ class HiveCliHook(BaseHook): if self.use_beeline: hive_bin = 'beeline' - jdbc_url = "jdbc:hive2://{host}:{port}/{schema}".format( - host=conn.host, port=conn.port, schema=conn.schema - ) + jdbc_url = f"jdbc:hive2://{conn.host}:{conn.port}/{conn.schema}" if conf.get('core', 'security') == 'kerberos': template = conn.extra_dejson.get('principal', "hive/[email protected]") if "_HOST" in template: @@ -143,9 +141,7 @@ class HiveCliHook(BaseHook): proxy_user = self._get_proxy_user() - jdbc_url += ";principal={template};{proxy_user}".format( - template=template, proxy_user=proxy_user - ) + jdbc_url += f";principal={template};{proxy_user}" elif self.auth: jdbc_url += ";auth=" + self.auth diff --git a/airflow/providers/apache/hive/operators/hive_stats.py b/airflow/providers/apache/hive/operators/hive_stats.py index d4de591..88faa40 100644 --- a/airflow/providers/apache/hive/operators/hive_stats.py +++ b/airflow/providers/apache/hive/operators/hive_stats.py @@ -135,9 +135,7 @@ class HiveStatsCollectionOperator(BaseOperator): where_clause_ = [f"{k} = '{v}'" for k, v in self.partition.items()] where_clause = " AND\n ".join(where_clause_) - sql = "SELECT {exprs_str} FROM {table} WHERE {where_clause};".format( - exprs_str=exprs_str, table=self.table, where_clause=where_clause - ) + sql = f"SELECT {exprs_str} FROM {self.table} WHERE {where_clause};" presto = PrestoHook(presto_conn_id=self.presto_conn_id) self.log.info('Executing SQL check: %s', sql) @@ -150,26 +148,22 @@ class HiveStatsCollectionOperator(BaseOperator): self.log.info("Deleting rows from previous runs if they exist") mysql = MySqlHook(self.mysql_conn_id) - sql = """ + sql = f""" SELECT 1 FROM hive_stats WHERE - table_name='{table}' AND + table_name='{self.table}' AND partition_repr='{part_json}' AND - dttm='{dttm}' + dttm='{self.dttm}' LIMIT 1; - """.format( - table=self.table, part_json=part_json, dttm=self.dttm - ) + """ if mysql.get_records(sql): - sql = """ + sql = f""" DELETE FROM hive_stats WHERE - table_name='{table}' AND + table_name='{self.table}' AND partition_repr='{part_json}' AND - dttm='{dttm}'; - """.format( - table=self.table, part_json=part_json, dttm=self.dttm - ) + dttm='{self.dttm}'; + """ mysql.run(sql) self.log.info("Pivoting and loading cells into the Airflow db") diff --git a/airflow/providers/apache/spark/hooks/spark_jdbc.py b/airflow/providers/apache/spark/hooks/spark_jdbc.py index 7a22c66..f90cea2 100644 --- a/airflow/providers/apache/spark/hooks/spark_jdbc.py +++ b/airflow/providers/apache/spark/hooks/spark_jdbc.py @@ -207,7 +207,7 @@ class SparkJDBCHook(SparkSubmitHook): if self._jdbc_connection['url']: arguments += [ '-url', - "{}{}/{}".format(jdbc_conn['conn_prefix'], jdbc_conn['url'], jdbc_conn['schema']), + f"{jdbc_conn['conn_prefix']}{jdbc_conn['url']}/{jdbc_conn['schema']}", ] if self._jdbc_connection['user']: arguments += ['-user', self._jdbc_connection['user']] diff --git a/airflow/providers/apache/spark/hooks/spark_submit.py b/airflow/providers/apache/spark/hooks/spark_submit.py index ac1a83a..e7f2186 100644 --- a/airflow/providers/apache/spark/hooks/spark_submit.py +++ b/airflow/providers/apache/spark/hooks/spark_submit.py @@ -291,7 +291,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin): connection_cmd += ["--master", self._connection['master']] for key in self._conf: - connection_cmd += ["--conf", "{}={}".format(key, str(self._conf[key]))] + connection_cmd += ["--conf", f"{key}={str(self._conf[key])}"] if self._env_vars and (self._is_kubernetes or self._is_yarn): if self._is_yarn: tmpl = "spark.yarn.appMasterEnv.{}={}" @@ -308,7 +308,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin): if self._is_kubernetes and self._connection['namespace']: connection_cmd += [ "--conf", - "spark.kubernetes.namespace={}".format(self._connection['namespace']), + f"spark.kubernetes.namespace={self._connection['namespace']}", ] if self._files: connection_cmd += ["--files", self._files] @@ -378,9 +378,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin): "/usr/bin/curl", "--max-time", str(curl_max_wait_time), - "{host}/v1/submissions/status/{submission_id}".format( - host=spark_host, submission_id=self._driver_id - ), + f"{spark_host}/v1/submissions/status/{self._driver_id}", ] self.log.info(connection_cmd) diff --git a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py index cf27713..ca82918 100644 --- a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py +++ b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py @@ -29,7 +29,7 @@ def _load_body_to_dict(body): try: body_dict = yaml.safe_load(body) except yaml.YAMLError as e: - raise AirflowException("Exception when loading resource definition: %s\n" % e) + raise AirflowException(f"Exception when loading resource definition: {e}\n") return body_dict @@ -169,7 +169,7 @@ class KubernetesHook(BaseHook): self.log.debug("Response: %s", response) return response except client.rest.ApiException as e: - raise AirflowException("Exception when calling -> create_custom_object: %s\n" % e) + raise AirflowException(f"Exception when calling -> create_custom_object: {e}\n") def get_custom_object( self, group: str, version: str, plural: str, name: str, namespace: Optional[str] = None @@ -197,7 +197,7 @@ class KubernetesHook(BaseHook): ) return response except client.rest.ApiException as e: - raise AirflowException("Exception when calling -> get_custom_object: %s\n" % e) + raise AirflowException(f"Exception when calling -> get_custom_object: {e}\n") def get_namespace(self) -> str: """Returns the namespace that defined in the connection""" diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py index 3f42ab1..7b4022e 100644 --- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py +++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py @@ -324,8 +324,7 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance- if len(pod_list.items) > 1 and self.reattach_on_restart: raise AirflowException( - 'More than one pod running with labels: ' - '{label_selector}'.format(label_selector=label_selector) + f'More than one pod running with labels: {label_selector}' ) launcher = pod_launcher.PodLauncher(kube_client=client, extract_xcom=self.do_xcom_push) diff --git a/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py b/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py index 2fa3401..eb555f1 100644 --- a/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py +++ b/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py @@ -106,7 +106,7 @@ class SparkKubernetesSensor(BaseSensorOperator): if self.attach_log and application_state in self.FAILURE_STATES + self.SUCCESS_STATES: self._log_driver(application_state, response) if application_state in self.FAILURE_STATES: - raise AirflowException("Spark application failed with state: %s" % application_state) + raise AirflowException(f"Spark application failed with state: {application_state}") elif application_state in self.SUCCESS_STATES: self.log.info("Spark application ended successfully") return True diff --git a/airflow/providers/databricks/hooks/databricks.py b/airflow/providers/databricks/hooks/databricks.py index cc0fbc6..28953b9 100644 --- a/airflow/providers/databricks/hooks/databricks.py +++ b/airflow/providers/databricks/hooks/databricks.py @@ -178,7 +178,7 @@ class DatabricksHook(BaseHook): # noqa auth = (self.databricks_conn.login, self.databricks_conn.password) host = self.databricks_conn.host - url = 'https://{host}/{endpoint}'.format(host=self._parse_host(host), endpoint=endpoint) + url = f'https://{self._parse_host(host)}/{endpoint}' if method == 'GET': request_func = requests.get diff --git a/airflow/providers/docker/operators/docker_swarm.py b/airflow/providers/docker/operators/docker_swarm.py index 18ecf9c..1098d98 100644 --- a/airflow/providers/docker/operators/docker_swarm.py +++ b/airflow/providers/docker/operators/docker_swarm.py @@ -126,7 +126,7 @@ class DockerSwarmOperator(DockerOperator): restart_policy=types.RestartPolicy(condition='none'), resources=types.Resources(mem_limit=self.mem_limit), ), - name='airflow-%s' % get_random_string(), + name=f'airflow-{get_random_string()}', labels={'name': f'airflow__{self.dag_id}__{self.task_id}'}, ) diff --git a/airflow/providers/ftp/hooks/ftp.py b/airflow/providers/ftp/hooks/ftp.py index 66a9080..6c3c064 100644 --- a/airflow/providers/ftp/hooks/ftp.py +++ b/airflow/providers/ftp/hooks/ftp.py @@ -180,7 +180,7 @@ class FTPHook(BaseHook): remote_path, remote_file_name = os.path.split(remote_full_path) conn.cwd(remote_path) self.log.info('Retrieving file from FTP: %s', remote_full_path) - conn.retrbinary('RETR %s' % remote_file_name, callback) + conn.retrbinary(f'RETR {remote_file_name}', callback) self.log.info('Finished retrieving file from FTP: %s', remote_full_path) if is_path and output_handle: @@ -210,7 +210,7 @@ class FTPHook(BaseHook): input_handle = local_full_path_or_buffer remote_path, remote_file_name = os.path.split(remote_full_path) conn.cwd(remote_path) - conn.storbinary('STOR %s' % remote_file_name, input_handle) + conn.storbinary(f'STOR {remote_file_name}', input_handle) if is_path: input_handle.close() diff --git a/airflow/providers/google/cloud/hooks/bigquery.py b/airflow/providers/google/cloud/hooks/bigquery.py index b7f46d9..0b3c3b3 100644 --- a/airflow/providers/google/cloud/hooks/bigquery.py +++ b/airflow/providers/google/cloud/hooks/bigquery.py @@ -2843,7 +2843,7 @@ def _split_tablename( cmpt = rest.split('.') if len(cmpt) == 3: if project_id: - raise ValueError("{var}Use either : or . to specify project".format(var=var_print(var_name))) + raise ValueError(f"{var_print(var_name)}Use either : or . to specify project") project_id = cmpt[0] dataset_id = cmpt[1] table_id = cmpt[2] @@ -2887,7 +2887,7 @@ def _cleanse_time_partitioning( def _validate_value(key: Any, value: Any, expected_type: Type) -> None: """Function to check expected type and raise error if type is not correct""" if not isinstance(value, expected_type): - raise TypeError("{} argument must have a type {} not {}".format(key, expected_type, type(value))) + raise TypeError(f"{key} argument must have a type {expected_type} not {type(value)}") def _api_resource_configs_duplication_check( diff --git a/airflow/providers/google/cloud/hooks/compute.py b/airflow/providers/google/cloud/hooks/compute.py index ab84241..c4da00a 100644 --- a/airflow/providers/google/cloud/hooks/compute.py +++ b/airflow/providers/google/cloud/hooks/compute.py @@ -99,9 +99,7 @@ class ComputeEngineHook(GoogleBaseHook): try: operation_name = response["name"] except KeyError: - raise AirflowException( - "Wrong response '{}' returned - it should contain " "'name' field".format(response) - ) + raise AirflowException(f"Wrong response '{response}' returned - it should contain 'name' field") self._wait_for_operation_to_complete(project_id=project_id, operation_name=operation_name, zone=zone) @GoogleBaseHook.fallback_to_default_project_id @@ -130,9 +128,7 @@ class ComputeEngineHook(GoogleBaseHook): try: operation_name = response["name"] except KeyError: - raise AirflowException( - "Wrong response '{}' returned - it should contain " "'name' field".format(response) - ) + raise AirflowException(f"Wrong response '{response}' returned - it should contain 'name' field") self._wait_for_operation_to_complete(project_id=project_id, operation_name=operation_name, zone=zone) @GoogleBaseHook.fallback_to_default_project_id @@ -159,9 +155,7 @@ class ComputeEngineHook(GoogleBaseHook): try: operation_name = response["name"] except KeyError: - raise AirflowException( - "Wrong response '{}' returned - it should contain " "'name' field".format(response) - ) + raise AirflowException(f"Wrong response '{response}' returned - it should contain 'name' field") self._wait_for_operation_to_complete(project_id=project_id, operation_name=operation_name, zone=zone) def _execute_set_machine_type(self, zone: str, resource_id: str, body: dict, project_id: str) -> dict: @@ -233,9 +227,7 @@ class ComputeEngineHook(GoogleBaseHook): try: operation_name = response["name"] except KeyError: - raise AirflowException( - "Wrong response '{}' returned - it should contain " "'name' field".format(response) - ) + raise AirflowException(f"Wrong response '{response}' returned - it should contain 'name' field") self._wait_for_operation_to_complete(project_id=project_id, operation_name=operation_name) @GoogleBaseHook.fallback_to_default_project_id @@ -318,9 +310,7 @@ class ComputeEngineHook(GoogleBaseHook): try: operation_name = response["name"] except KeyError: - raise AirflowException( - "Wrong response '{}' returned - it should contain " "'name' field".format(response) - ) + raise AirflowException(f"Wrong response '{response}' returned - it should contain 'name' field") self._wait_for_operation_to_complete(project_id=project_id, operation_name=operation_name, zone=zone) def _wait_for_operation_to_complete( diff --git a/airflow/providers/google/cloud/hooks/dataflow.py b/airflow/providers/google/cloud/hooks/dataflow.py index 0a665d4..da3e49c 100644 --- a/airflow/providers/google/cloud/hooks/dataflow.py +++ b/airflow/providers/google/cloud/hooks/dataflow.py @@ -385,21 +385,19 @@ class _DataflowJobsController(LoggingMixin): if job['currentState'] == DataflowJobStatus.JOB_STATE_DONE: return True elif job['currentState'] == DataflowJobStatus.JOB_STATE_FAILED: - raise Exception("Google Cloud Dataflow job {} has failed.".format(job['name'])) + raise Exception(f"Google Cloud Dataflow job {job['name']} has failed.") elif job['currentState'] == DataflowJobStatus.JOB_STATE_CANCELLED: - raise Exception("Google Cloud Dataflow job {} was cancelled.".format(job['name'])) + raise Exception(f"Google Cloud Dataflow job {job['name']} was cancelled.") elif job['currentState'] == DataflowJobStatus.JOB_STATE_DRAINED: - raise Exception("Google Cloud Dataflow job {} was drained.".format(job['name'])) + raise Exception(f"Google Cloud Dataflow job {job['name']} was drained.") elif job['currentState'] == DataflowJobStatus.JOB_STATE_UPDATED: - raise Exception("Google Cloud Dataflow job {} was updated.".format(job['name'])) + raise Exception(f"Google Cloud Dataflow job {job['name']} was updated.") elif job['currentState'] == DataflowJobStatus.JOB_STATE_RUNNING and wait_for_running: return True elif job['currentState'] in DataflowJobStatus.AWAITING_STATES: return self._wait_until_finished is False self.log.debug("Current job: %s", str(job)) - raise Exception( - "Google Cloud Dataflow job {} was unknown state: {}".format(job["name"], job["currentState"]) - ) + raise Exception(f"Google Cloud Dataflow job {job['name']} was unknown state: {job['currentState']}") def wait_for_done(self) -> None: """Helper method to wait for result of submitted job.""" diff --git a/airflow/providers/google/cloud/hooks/functions.py b/airflow/providers/google/cloud/hooks/functions.py index 8cb93df..49a6e2c 100644 --- a/airflow/providers/google/cloud/hooks/functions.py +++ b/airflow/providers/google/cloud/hooks/functions.py @@ -216,9 +216,7 @@ class CloudFunctionsHook(GoogleBaseHook): :type project_id: str :return: None """ - name = "projects/{project_id}/locations/{location}/functions/{function_id}".format( - project_id=project_id, location=location, function_id=function_id - ) + name = f"projects/{project_id}/locations/{location}/functions/{function_id}" # fmt: off response = self.get_conn().projects().locations().functions().call( # pylint: disable=no-member name=name, diff --git a/airflow/providers/google/cloud/hooks/kubernetes_engine.py b/airflow/providers/google/cloud/hooks/kubernetes_engine.py index 930c1cd..604815d 100644 --- a/airflow/providers/google/cloud/hooks/kubernetes_engine.py +++ b/airflow/providers/google/cloud/hooks/kubernetes_engine.py @@ -99,7 +99,7 @@ class GKEHook(GoogleBaseHook): if operation.status == Operation.Status.RUNNING or operation.status == Operation.Status.PENDING: time.sleep(OPERATIONAL_POLL_INTERVAL) else: - raise exceptions.GoogleCloudError("Operation has failed with status: %s" % operation.status) + raise exceptions.GoogleCloudError(f"Operation has failed with status: {operation.status}") # To update status of operation operation = self.get_operation(operation.name, project_id=project_id or self.project_id) return operation diff --git a/airflow/providers/google/cloud/log/gcs_task_handler.py b/airflow/providers/google/cloud/log/gcs_task_handler.py index 9fd456d..b57a18a 100644 --- a/airflow/providers/google/cloud/log/gcs_task_handler.py +++ b/airflow/providers/google/cloud/log/gcs_task_handler.py @@ -156,7 +156,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin): log = f'*** Reading remote log from {remote_loc}.\n{remote_log}\n' return log, {'end_of_log': True} except Exception as e: # pylint: disable=broad-except - log = '*** Unable to read remote log from {}\n*** {}\n\n'.format(remote_loc, str(e)) + log = f'*** Unable to read remote log from {remote_loc}\n*** {str(e)}\n\n' self.log.error(log) local_log, metadata = super()._read(ti, try_number) log += local_log @@ -178,7 +178,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin): log = '\n'.join([old_log, log]) if old_log else log except Exception as e: # pylint: disable=broad-except if not hasattr(e, 'resp') or e.resp.get('status') != '404': # pylint: disable=no-member - log = '*** Previous log discarded: {}\n\n'.format(str(e)) + log + log = f'*** Previous log discarded: {str(e)}\n\n' + log self.log.info("Previous log discarded: %s", e) try: diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py index ea2c9d3..28956ec 100644 --- a/airflow/providers/google/cloud/operators/bigquery.py +++ b/airflow/providers/google/cloud/operators/bigquery.py @@ -729,9 +729,7 @@ class BigQueryExecuteQueryOperator(BaseOperator): for s in self.sql ] else: - raise AirflowException( - "argument 'sql' of type {} is neither a string nor an iterable".format(type(str)) - ) + raise AirflowException(f"argument 'sql' of type {type(str)} is neither a string nor an iterable") context['task_instance'].xcom_push(key='job_id', value=job_id) def on_kill(self) -> None: diff --git a/airflow/providers/google/cloud/operators/dataproc.py b/airflow/providers/google/cloud/operators/dataproc.py index ac93915..3bcb0aa 100644 --- a/airflow/providers/google/cloud/operators/dataproc.py +++ b/airflow/providers/google/cloud/operators/dataproc.py @@ -1455,7 +1455,7 @@ class DataprocSubmitPySparkJobOperator(DataprocJobBaseOperator): @staticmethod def _generate_temp_filename(filename): date = time.strftime('%Y%m%d%H%M%S') - return "{}_{}_{}".format(date, str(uuid.uuid4())[:8], ntpath.basename(filename)) + return f"{date}_{str(uuid.uuid4())[:8]}_{ntpath.basename(filename)}" def _upload_file_temp(self, bucket, local_file): """Upload a local file to a Google Cloud Storage bucket.""" diff --git a/airflow/providers/jenkins/operators/jenkins_job_trigger.py b/airflow/providers/jenkins/operators/jenkins_job_trigger.py index 69d0f63..91b8ca9 100644 --- a/airflow/providers/jenkins/operators/jenkins_job_trigger.py +++ b/airflow/providers/jenkins/operators/jenkins_job_trigger.py @@ -54,7 +54,7 @@ def jenkins_request_with_headers(jenkins_server: Jenkins, req: Request) -> Optio response_headers = response.headers if response_body is None: raise jenkins.EmptyResponseException( - "Error communicating with server[%s]: empty response" % jenkins_server.server + f"Error communicating with server[{jenkins_server.server}]: empty response" ) return {'body': response_body.decode('utf-8'), 'headers': response_headers} except HTTPError as e: @@ -66,9 +66,9 @@ def jenkins_request_with_headers(jenkins_server: Jenkins, req: Request) -> Optio else: raise except socket.timeout as e: - raise jenkins.TimeoutException('Error in request: %s' % e) + raise jenkins.TimeoutException(f'Error in request: {e}') except URLError as e: - raise JenkinsException('Error in request: %s' % e.reason) + raise JenkinsException(f'Error in request: {e.reason}') return None diff --git a/airflow/providers/jira/hooks/jira.py b/airflow/providers/jira/hooks/jira.py index 8951c12..5d186b8 100644 --- a/airflow/providers/jira/hooks/jira.py +++ b/airflow/providers/jira/hooks/jira.py @@ -82,8 +82,8 @@ class JiraHook(BaseHook): proxies=self.proxies, ) except JIRAError as jira_error: - raise AirflowException('Failed to create jira client, jira error: %s' % str(jira_error)) + raise AirflowException(f'Failed to create jira client, jira error: {str(jira_error)}') except Exception as e: - raise AirflowException('Failed to create jira client, error: %s' % str(e)) + raise AirflowException(f'Failed to create jira client, error: {str(e)}') return self.client diff --git a/airflow/providers/jira/operators/jira.py b/airflow/providers/jira/operators/jira.py index 3550d1f..1e9530c 100644 --- a/airflow/providers/jira/operators/jira.py +++ b/airflow/providers/jira/operators/jira.py @@ -89,6 +89,6 @@ class JiraOperator(BaseOperator): return jira_result except JIRAError as jira_error: - raise AirflowException("Failed to execute jiraOperator, error: %s" % str(jira_error)) + raise AirflowException(f"Failed to execute jiraOperator, error: {str(jira_error)}") except Exception as e: - raise AirflowException("Jira operator error: %s" % str(e)) + raise AirflowException(f"Jira operator error: {str(e)}") diff --git a/airflow/providers/microsoft/azure/operators/azure_container_instances.py b/airflow/providers/microsoft/azure/operators/azure_container_instances.py index 74418ac..70355ed 100644 --- a/airflow/providers/microsoft/azure/operators/azure_container_instances.py +++ b/airflow/providers/microsoft/azure/operators/azure_container_instances.py @@ -278,7 +278,7 @@ class AzureContainerInstancesOperator(BaseOperator): self.log.info("Container had exit code: %s", exit_code) if exit_code != 0: - raise AirflowException("Container had a non-zero exit code, %s" % exit_code) + raise AirflowException(f"Container had a non-zero exit code, {exit_code}") return exit_code except CloudError: diff --git a/airflow/providers/microsoft/winrm/operators/winrm.py b/airflow/providers/microsoft/winrm/operators/winrm.py index 5500e3d..55d65cf 100644 --- a/airflow/providers/microsoft/winrm/operators/winrm.py +++ b/airflow/providers/microsoft/winrm/operators/winrm.py @@ -129,7 +129,7 @@ class WinRMOperator(BaseOperator): self.winrm_hook.winrm_protocol.close_shell(winrm_client) # type: ignore[attr-defined] except Exception as e: - raise AirflowException("WinRM operator error: {}".format(str(e))) + raise AirflowException(f"WinRM operator error: {str(e)}") if return_code == 0: # returning output if do_xcom_push is set diff --git a/airflow/providers/mysql/hooks/mysql.py b/airflow/providers/mysql/hooks/mysql.py index 5da0be2..7d1f71f 100644 --- a/airflow/providers/mysql/hooks/mysql.py +++ b/airflow/providers/mysql/hooks/mysql.py @@ -164,12 +164,10 @@ class MySqlHook(DbApiHook): conn = self.get_conn() cur = conn.cursor() cur.execute( - """ + f""" LOAD DATA LOCAL INFILE '{tmp_file}' INTO TABLE {table} - """.format( - tmp_file=tmp_file, table=table - ) + """ ) conn.commit() @@ -178,12 +176,10 @@ class MySqlHook(DbApiHook): conn = self.get_conn() cur = conn.cursor() cur.execute( - """ + f""" SELECT * INTO OUTFILE '{tmp_file}' FROM {table} - """.format( - tmp_file=tmp_file, table=table - ) + """ ) conn.commit() @@ -251,17 +247,12 @@ class MySqlHook(DbApiHook): cursor = conn.cursor() cursor.execute( - """ + f""" LOAD DATA LOCAL INFILE '{tmp_file}' {duplicate_key_handling} INTO TABLE {table} {extra_options} - """.format( - tmp_file=tmp_file, - table=table, - duplicate_key_handling=duplicate_key_handling, - extra_options=extra_options, - ) + """ ) cursor.close() diff --git a/airflow/providers/opsgenie/hooks/opsgenie_alert.py b/airflow/providers/opsgenie/hooks/opsgenie_alert.py index aad834c..60f1734 100644 --- a/airflow/providers/opsgenie/hooks/opsgenie_alert.py +++ b/airflow/providers/opsgenie/hooks/opsgenie_alert.py @@ -82,5 +82,5 @@ class OpsgenieAlertHook(HttpHook): return self.run( endpoint='v2/alerts', data=json.dumps(payload), - headers={'Content-Type': 'application/json', 'Authorization': 'GenieKey %s' % api_key}, + headers={'Content-Type': 'application/json', 'Authorization': f'GenieKey {api_key}'}, ) diff --git a/airflow/providers/oracle/hooks/oracle.py b/airflow/providers/oracle/hooks/oracle.py index bfb40a4..1c5c536 100644 --- a/airflow/providers/oracle/hooks/oracle.py +++ b/airflow/providers/oracle/hooks/oracle.py @@ -178,7 +178,7 @@ class OracleHook(DbApiHook): else: lst.append(str(cell)) values = tuple(lst) - sql = 'INSERT /*+ APPEND */ INTO {} {} VALUES ({})'.format(table, target_fields, ','.join(values)) + sql = f"INSERT /*+ APPEND */ INTO {table} {target_fields} VALUES ({','.join(values)})" cur.execute(sql) if i % commit_every == 0: conn.commit() # type: ignore[attr-defined] diff --git a/airflow/providers/pagerduty/hooks/pagerduty.py b/airflow/providers/pagerduty/hooks/pagerduty.py index 4de43df..e42357a 100644 --- a/airflow/providers/pagerduty/hooks/pagerduty.py +++ b/airflow/providers/pagerduty/hooks/pagerduty.py @@ -145,7 +145,7 @@ class PagerdutyHook(BaseHook): actions = ('trigger', 'acknowledge', 'resolve') if action not in actions: - raise ValueError("Event action must be one of: %s" % ', '.join(actions)) + raise ValueError(f"Event action must be one of: {', '.join(actions)}") data = { "event_action": action, "payload": payload, diff --git a/airflow/providers/plexus/operators/job.py b/airflow/providers/plexus/operators/job.py index ece5df6..8f56987 100644 --- a/airflow/providers/plexus/operators/job.py +++ b/airflow/providers/plexus/operators/job.py @@ -149,9 +149,7 @@ class PlexusJobOperator(BaseOperator): """ missing_params = self.required_params - set(self.job_params) if len(missing_params) > 0: - raise AirflowException( - "Missing the following required job_params: {}".format(", ".join(missing_params)) - ) + raise AirflowException(f"Missing the following required job_params: {', '.join(missing_params)}") params = {} for prm in self.job_params: if prm in self.lookups: diff --git a/airflow/providers/postgres/hooks/postgres.py b/airflow/providers/postgres/hooks/postgres.py index 0f38823..cd80bd9 100644 --- a/airflow/providers/postgres/hooks/postgres.py +++ b/airflow/providers/postgres/hooks/postgres.py @@ -223,7 +223,7 @@ class PostgresHook(DbApiHook): else: target_fields_fragment = '' - sql = "INSERT INTO {} {} VALUES ({})".format(table, target_fields_fragment, ",".join(placeholders)) + sql = f"INSERT INTO {table} {target_fields_fragment} VALUES ({','.join(placeholders)})" if replace: if target_fields is None: diff --git a/airflow/providers/qubole/hooks/qubole.py b/airflow/providers/qubole/hooks/qubole.py index 0cc3832..6655de5 100644 --- a/airflow/providers/qubole/hooks/qubole.py +++ b/airflow/providers/qubole/hooks/qubole.py @@ -262,7 +262,7 @@ class QuboleHook(BaseHook): for key, value in self.kwargs.items(): # pylint: disable=too-many-nested-blocks if key in COMMAND_ARGS[cmd_type]: if key in HYPHEN_ARGS: - args.append("--{}={}".format(key.replace('_', '-'), value)) + args.append(f"--{key.replace('_', '-')}={value}") elif key in positional_args_list: inplace_args = value elif key == 'tags': @@ -273,7 +273,7 @@ class QuboleHook(BaseHook): else: args.append(f"--{key}={value}") - args.append("--tags={}".format(','.join(filter(None, tags)))) + args.append(f"--tags={','.join(filter(None, tags))}") if inplace_args is not None: args += inplace_args.split(' ') diff --git a/airflow/providers/salesforce/hooks/salesforce.py b/airflow/providers/salesforce/hooks/salesforce.py index aeaef5d..d76baac 100644 --- a/airflow/providers/salesforce/hooks/salesforce.py +++ b/airflow/providers/salesforce/hooks/salesforce.py @@ -146,7 +146,7 @@ class SalesforceHook(BaseHook): :return: all instances of the object from Salesforce. :rtype: dict """ - query = "SELECT {} FROM {}".format(",".join(fields), obj) + query = f"SELECT {','.join(fields)} FROM {obj}" self.log.info( "Making query to Salesforce: %s", diff --git a/airflow/providers/sftp/operators/sftp.py b/airflow/providers/sftp/operators/sftp.py index 39cc14b..f137352 100644 --- a/airflow/providers/sftp/operators/sftp.py +++ b/airflow/providers/sftp/operators/sftp.py @@ -154,7 +154,7 @@ class SFTPOperator(BaseOperator): sftp_client.put(self.local_filepath, self.remote_filepath, confirm=self.confirm) except Exception as e: - raise AirflowException("Error while transferring {}, error: {}".format(file_msg, str(e))) + raise AirflowException(f"Error while transferring {file_msg}, error: {str(e)}") return self.local_filepath diff --git a/airflow/providers/ssh/operators/ssh.py b/airflow/providers/ssh/operators/ssh.py index 14cec96..32ac1f2 100644 --- a/airflow/providers/ssh/operators/ssh.py +++ b/airflow/providers/ssh/operators/ssh.py @@ -168,7 +168,7 @@ class SSHOperator(BaseOperator): raise AirflowException(f"error running cmd: {self.command}, error: {error_msg}") except Exception as e: - raise AirflowException("SSH operator error: {}".format(str(e))) + raise AirflowException(f"SSH operator error: {str(e)}") return True diff --git a/airflow/security/kerberos.py b/airflow/security/kerberos.py index 87c269d..f1ddfdf 100644 --- a/airflow/security/kerberos.py +++ b/airflow/security/kerberos.py @@ -56,7 +56,7 @@ def renew_from_kt(principal: str, keytab: str, exit_on_fail: bool = True): """ # The config is specified in seconds. But we ask for that same amount in # minutes to give ourselves a large renewal buffer. - renewal_lifetime = "%sm" % conf.getint('kerberos', 'reinit_frequency') + renewal_lifetime = f"{conf.getint('kerberos', 'reinit_frequency')}m" cmd_principal = principal or conf.get('kerberos', 'principal').replace("_HOST", socket.getfqdn()) @@ -128,7 +128,7 @@ def perform_krb181_workaround(principal: str): ret = subprocess.call(cmdv, close_fds=True) if ret != 0: - principal = "{}/{}".format(principal or conf.get('kerberos', 'principal'), socket.getfqdn()) + principal = f"{principal or conf.get('kerberos', 'principal')}/{socket.getfqdn()}" princ = principal ccache = conf.get('kerberos', 'principal') log.error( diff --git a/airflow/security/utils.py b/airflow/security/utils.py index e5ceadb..ca203b5 100644 --- a/airflow/security/utils.py +++ b/airflow/security/utils.py @@ -55,7 +55,7 @@ def replace_hostname_pattern(components, host=None): fqdn = host if not fqdn or fqdn == '0.0.0.0': fqdn = get_hostname() - return '{}/{}@{}'.format(components[0], fqdn.lower(), components[2]) + return f'{components[0]}/{fqdn.lower()}@{components[2]}' def get_fqdn(hostname_or_ip=None): diff --git a/airflow/sensors/date_time.py b/airflow/sensors/date_time.py index 9e997bc..4a6f11b 100644 --- a/airflow/sensors/date_time.py +++ b/airflow/sensors/date_time.py @@ -65,7 +65,7 @@ class DateTimeSensor(BaseSensorOperator): self.target_time = target_time else: raise TypeError( - "Expected str or datetime.datetime type for target_time. Got {}".format(type(target_time)) + f"Expected str or datetime.datetime type for target_time. Got {type(target_time)}" ) def poke(self, context: Dict) -> bool: diff --git a/airflow/sensors/sql.py b/airflow/sensors/sql.py index 573c7cd..923af6c 100644 --- a/airflow/sensors/sql.py +++ b/airflow/sensors/sql.py @@ -89,7 +89,7 @@ class SqlSensor(BaseSensorOperator): if conn.conn_type not in allowed_conn_type: raise AirflowException( "The connection type is not supported by SqlSensor. " - + "Supported connection types: {}".format(list(allowed_conn_type)) + + f"Supported connection types: {list(allowed_conn_type)}" ) return conn.get_hook() diff --git a/airflow/utils/cli.py b/airflow/utils/cli.py index 68a0b44..600a693 100644 --- a/airflow/utils/cli.py +++ b/airflow/utils/cli.py @@ -265,7 +265,7 @@ def sigquit_handler(sig, frame): # pylint: disable=unused-argument id_to_name = {th.ident: th.name for th in threading.enumerate()} code = [] for thread_id, stack in sys._current_frames().items(): # pylint: disable=protected-access - code.append("\n# Thread: {}({})".format(id_to_name.get(thread_id, ""), thread_id)) + code.append(f"\n# Thread: {id_to_name.get(thread_id, '')}({thread_id})") for filename, line_number, name, line in traceback.extract_stack(stack): code.append(f'File: "{filename}", line {line_number}, in {name}') if line: diff --git a/airflow/utils/code_utils.py b/airflow/utils/code_utils.py index 77cfa42..53b2db1 100644 --- a/airflow/utils/code_utils.py +++ b/airflow/utils/code_utils.py @@ -50,7 +50,7 @@ def get_python_source(x: Any) -> Optional[str]: pass if source_code is None: - source_code = 'No source code available for {}'.format(type(x)) + source_code = f'No source code available for {type(x)}' return source_code diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index 6f04cbf..7617bda 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -120,7 +120,7 @@ class FileTaskHandler(logging.Handler): log += "".join(file.readlines()) except Exception as e: # pylint: disable=broad-except log = f"*** Failed to load local log file: {location}\n" - log += "*** {}\n".format(str(e)) + log += f"*** {str(e)}\n" elif conf.get('core', 'executor') == 'KubernetesExecutor': # pylint: disable=too-many-nested-blocks try: from airflow.kubernetes.kube_client import get_kube_client @@ -158,7 +158,7 @@ class FileTaskHandler(logging.Handler): log += line.decode() except Exception as f: # pylint: disable=broad-except - log += '*** Unable to fetch logs from worker pod {} ***\n{}\n\n'.format(ti.hostname, str(f)) + log += f'*** Unable to fetch logs from worker pod {ti.hostname} ***\n{str(f)}\n\n' else: url = os.path.join("http://{ti.hostname}:{worker_log_server_port}/log", log_relative_path).format( ti=ti, worker_log_server_port=conf.get('celery', 'WORKER_LOG_SERVER_PORT') @@ -180,7 +180,7 @@ class FileTaskHandler(logging.Handler): log += '\n' + response.text except Exception as e: # pylint: disable=broad-except - log += "*** Failed to fetch log file from worker. {}\n".format(str(e)) + log += f"*** Failed to fetch log file from worker. {str(e)}\n" return log, {'end_of_log': True} diff --git a/airflow/utils/timezone.py b/airflow/utils/timezone.py index d302cbe..2b2521a 100644 --- a/airflow/utils/timezone.py +++ b/airflow/utils/timezone.py @@ -109,7 +109,7 @@ def make_aware(value, timezone=None): # Check that we won't overwrite the timezone of an aware datetime. if is_localized(value): - raise ValueError("make_aware expects a naive datetime, got %s" % value) + raise ValueError(f"make_aware expects a naive datetime, got {value}") if hasattr(value, 'fold'): # In case of python 3.6 we want to do the same that pendulum does for python3.5 # i.e in case we move clock back we want to schedule the run at the time of the second diff --git a/airflow/www/utils.py b/airflow/www/utils.py index 265a12f..09ebe9a 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -128,14 +128,14 @@ def generate_pages(current_page, num_of_pages, search=None, status=None, window= is_disabled = 'disabled' if current_page <= 0 else '' output.append( first_node.format( - href_link="?{}".format(get_params(page=0, search=search, status=status)), # noqa + href_link=f"?{get_params(page=0, search=search, status=status)}", # noqa disabled=is_disabled, ) ) page_link = void_link if current_page > 0: - page_link = '?{}'.format(get_params(page=(current_page - 1), search=search, status=status)) + page_link = f'?{get_params(page=current_page - 1, search=search, status=status)}' output.append(previous_node.format(href_link=page_link, disabled=is_disabled)) # noqa @@ -157,7 +157,7 @@ def generate_pages(current_page, num_of_pages, search=None, status=None, window= 'is_active': 'active' if is_current(current_page, page) else '', 'href_link': void_link if is_current(current_page, page) - else '?{}'.format(get_params(page=page, search=search, status=status)), + else f'?{get_params(page=page, search=search, status=status)}', 'page_num': page + 1, } output.append(page_node.format(**vals)) # noqa @@ -167,13 +167,13 @@ def generate_pages(current_page, num_of_pages, search=None, status=None, window= page_link = ( void_link if current_page >= num_of_pages - 1 - else '?{}'.format(get_params(page=current_page + 1, search=search, status=status)) + else f'?{get_params(page=current_page + 1, search=search, status=status)}' ) output.append(next_node.format(href_link=page_link, disabled=is_disabled)) # noqa output.append( last_node.format( - href_link="?{}".format(get_params(page=last_page, search=search, status=status)), # noqa + href_link=f"?{get_params(page=last_page, search=search, status=status)}", # noqa disabled=is_disabled, ) ) diff --git a/airflow/www/validators.py b/airflow/www/validators.py index 9699a47..fe35dcb 100644 --- a/airflow/www/validators.py +++ b/airflow/www/validators.py @@ -37,7 +37,7 @@ class GreaterEqualThan(EqualTo): try: other = form[self.fieldname] except KeyError: - raise ValidationError(field.gettext("Invalid field name '%s'." % self.fieldname)) + raise ValidationError(field.gettext(f"Invalid field name '{self.fieldname}'.")) if field.data is None or other.data is None: return @@ -50,7 +50,7 @@ class GreaterEqualThan(EqualTo): message = self.message if message is None: message = field.gettext( - 'Field must be greater than or equal to %(other_label)s.' % message_args + f"Field must be greater than or equal to {message_args['other_label']}." ) else: message = message % message_args diff --git a/airflow/www/views.py b/airflow/www/views.py index 1c00951..93ffc38 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -1639,7 +1639,7 @@ class Airflow(AirflowBaseView): # noqa: D101 pylint: disable=too-many-public-m new_dag_state = set_dag_run_state_to_failed(dag, execution_date, commit=confirmed) if confirmed: - flash('Marked failed on {} task instances'.format(len(new_dag_state))) + flash(f'Marked failed on {len(new_dag_state)} task instances') return redirect(origin) else: @@ -1668,7 +1668,7 @@ class Airflow(AirflowBaseView): # noqa: D101 pylint: disable=too-many-public-m new_dag_state = set_dag_run_state_to_success(dag, execution_date, commit=confirmed) if confirmed: - flash('Marked success on {} task instances'.format(len(new_dag_state))) + flash(f'Marked success on {len(new_dag_state)} task instances') return redirect(origin) else: @@ -1752,7 +1752,7 @@ class Airflow(AirflowBaseView): # noqa: D101 pylint: disable=too-many-public-m commit=True, ) - flash("Marked {} on {} task instances".format(state, len(altered))) + flash(f"Marked {state} on {len(altered)} task instances") return redirect(origin) to_be_altered = set_state( @@ -3410,10 +3410,7 @@ class DagRunModelView(AirflowModelView): cleared_ti_count += len(tis) models.clear_task_instances(tis, session, dag=dag) - flash( - "{count} dag runs and {altered_ti_count} task instances " - "were cleared".format(count=count, altered_ti_count=cleared_ti_count) - ) + flash(f"{count} dag runs and {cleared_ti_count} task instances were cleared") except Exception: # noqa pylint: disable=broad-except flash('Failed to clear state', 'error') return redirect(self.get_default_url()) @@ -3636,7 +3633,7 @@ class TaskInstanceModelView(AirflowModelView): models.clear_task_instances(task_instances_list, session, dag=dag) session.commit() - flash("{} task instances have been cleared".format(len(task_instances))) + flash(f"{len(task_instances)} task instances have been cleared") self.update_redirect() return redirect(self.get_redirect()) except Exception as e: # noqa pylint: disable=broad-except @@ -3651,11 +3648,7 @@ class TaskInstanceModelView(AirflowModelView): for ti in tis: ti.set_state(target_state, session) session.commit() - flash( - "{count} task instances were set to '{target_state}'".format( - count=count, target_state=target_state - ) - ) + flash(f"{count} task instances were set to '{target_state}'") except Exception: # noqa pylint: disable=broad-except flash('Failed to set state', 'error') diff --git a/breeze-complete b/breeze-complete index 63b34ad..5562cee 100644 --- a/breeze-complete +++ b/breeze-complete @@ -97,6 +97,7 @@ dont-use-safe-filter end-of-file-fixer fix-encoding-pragma flake8 +flynt forbid-tabs helm-lint identity diff --git a/docs/exts/airflow_intersphinx.py b/docs/exts/airflow_intersphinx.py index ee83b8f..a3bd262 100644 --- a/docs/exts/airflow_intersphinx.py +++ b/docs/exts/airflow_intersphinx.py @@ -150,7 +150,7 @@ if __name__ == "__main__": except ValueError as exc: print(exc.args[0] % exc.args[1:]) except Exception as exc: # pylint: disable=broad-except - print('Unknown error: %r' % exc) + print(f'Unknown error: {exc!r}') provider_mapping = _generate_provider_intersphinx_mapping() diff --git a/docs/exts/exampleinclude.py b/docs/exts/exampleinclude.py index 141ca82..815eb21 100644 --- a/docs/exts/exampleinclude.py +++ b/docs/exts/exampleinclude.py @@ -140,7 +140,7 @@ def register_source(app, env, modname): """ entry = env._viewcode_modules.get(modname, None) if entry is False: - print("[%s] Entry is false for " % modname) + print(f"[{modname}] Entry is false for ") return False code_tags = app.emit_firstresult("viewcode-find-source", modname) diff --git a/docs/exts/redirects.py b/docs/exts/redirects.py index 7874124..20603c4 100644 --- a/docs/exts/redirects.py +++ b/docs/exts/redirects.py @@ -54,7 +54,7 @@ def generate_redirects(app): from_path = from_path.replace(in_suffix, '.html') to_path = to_path.replace(in_suffix, ".html") - to_path_prefix = "..%s" % os.path.sep * (len(from_path.split(os.path.sep)) - 1) + to_path_prefix = f"..{os.path.sep}" * (len(from_path.split(os.path.sep)) - 1) to_path = to_path_prefix + to_path log.debug("Resolved redirect '%s' to '%s'", from_path, to_path) diff --git a/metastore_browser/hive_metastore.py b/metastore_browser/hive_metastore.py index 58d4da3..462f245 100644 --- a/metastore_browser/hive_metastore.py +++ b/metastore_browser/hive_metastore.py @@ -95,7 +95,7 @@ class MetastoreBrowserView(BaseView): def partitions(self): """Retrieve table partitions""" schema, table = request.args.get("table").split('.') - sql = """ + sql = f""" SELECT a.PART_NAME, a.CREATE_TIME, @@ -111,9 +111,7 @@ class MetastoreBrowserView(BaseView): b.TBL_NAME like '{table}' AND d.NAME like '{schema}' ORDER BY PART_NAME DESC - """.format( - table=table, schema=schema - ) + """ hook = MySqlHook(METASTORE_MYSQL_CONN_ID) df = hook.get_pandas_df(sql) return df.to_html( @@ -133,7 +131,7 @@ class MetastoreBrowserView(BaseView): if DB_DENY_LIST: dbs = ",".join(["'" + db + "'" for db in DB_DENY_LIST]) where_clause = f"AND b.name NOT IN ({dbs})" - sql = """ + sql = f""" SELECT CONCAT(b.NAME, '.', a.TBL_NAME), TBL_TYPE FROM TBLS a JOIN DBS b ON a.DB_ID = b.DB_ID @@ -143,10 +141,8 @@ class MetastoreBrowserView(BaseView): b.NAME NOT LIKE '%tmp%' AND b.NAME NOT LIKE '%temp%' {where_clause} - LIMIT {LIMIT}; - """.format( - where_clause=where_clause, LIMIT=TABLE_SELECTOR_LIMIT - ) + LIMIT {TABLE_SELECTOR_LIMIT}; + """ hook = MySqlHook(METASTORE_MYSQL_CONN_ID) data = [{'id': row[0], 'text': row[0]} for row in hook.get_records(sql)] return json.dumps(data) diff --git a/scripts/ci/pre_commit/pre_commit_check_provider_yaml_files.py b/scripts/ci/pre_commit/pre_commit_check_provider_yaml_files.py index 038e027..688f767 100755 --- a/scripts/ci/pre_commit/pre_commit_check_provider_yaml_files.py +++ b/scripts/ci/pre_commit/pre_commit_check_provider_yaml_files.py @@ -98,16 +98,16 @@ def assert_sets_equal(set1, set2): try: difference1 = set1.difference(set2) except TypeError as e: - raise AssertionError('invalid type when attempting set difference: %s' % e) + raise AssertionError(f'invalid type when attempting set difference: {e}') except AttributeError as e: - raise AssertionError('first argument does not support set difference: %s' % e) + raise AssertionError(f'first argument does not support set difference: {e}') try: difference2 = set2.difference(set1) except TypeError as e: - raise AssertionError('invalid type when attempting set difference: %s' % e) + raise AssertionError(f'invalid type when attempting set difference: {e}') except AttributeError as e: - raise AssertionError('second argument does not support set difference: %s' % e) + raise AssertionError(f'second argument does not support set difference: {e}') if not (difference1 or difference2): return diff --git a/scripts/ci/pre_commit/pre_commit_yaml_to_cfg.py b/scripts/ci/pre_commit/pre_commit_yaml_to_cfg.py index 6558901..0480286 100755 --- a/scripts/ci/pre_commit/pre_commit_yaml_to_cfg.py +++ b/scripts/ci/pre_commit/pre_commit_yaml_to_cfg.py @@ -121,7 +121,7 @@ def _write_option(configfile, idx, option): if option["example"]: if not str(option["name"]).endswith("_template"): option["example"] = option["example"].replace("{", "{{").replace("}", "}}") - configfile.write("# Example: {} = {}\n".format(option["name"], option["example"])) + configfile.write(f"# Example: {option['name']} = {option['example']}\n") if option["default"] is not None: if not isinstance(option["default"], str): @@ -134,9 +134,9 @@ def _write_option(configfile, idx, option): value = " " + option["default"] else: value = "" - configfile.write("{} ={}\n".format(option["name"], value)) + configfile.write(f"{option['name']} ={value}\n") else: - configfile.write("# {} =\n".format(option["name"])) + configfile.write(f"# {option['name']} =\n") if __name__ == '__main__': diff --git a/tests/api/common/experimental/test_pool.py b/tests/api/common/experimental/test_pool.py index ae00226..3c75a14 100644 --- a/tests/api/common/experimental/test_pool.py +++ b/tests/api/common/experimental/test_pool.py @@ -39,7 +39,7 @@ class TestPool(unittest.TestCase): clear_db_pools() self.pools = [Pool.get_default_pool()] for i in range(self.USER_POOL_COUNT): - name = 'experimental_%s' % (i + 1) + name = f'experimental_{i + 1}' pool = models.Pool( pool=name, slots=i, diff --git a/tests/cli/commands/test_connection_command.py b/tests/cli/commands/test_connection_command.py index ae78892..c81ff81 100644 --- a/tests/cli/commands/test_connection_command.py +++ b/tests/cli/commands/test_connection_command.py @@ -496,7 +496,7 @@ class TestCliAddConnections(unittest.TestCase): "connections", "add", "new0", - "--conn-uri=%s" % TEST_URL, + f"--conn-uri={TEST_URL}", "--conn-description=new0 description", ], "Successfully added `conn_id`=new0 : postgresql://airflow:airflow@host:5432/airflow", @@ -516,7 +516,7 @@ class TestCliAddConnections(unittest.TestCase): "connections", "add", "new1", - "--conn-uri=%s" % TEST_URL, + f"--conn-uri={TEST_URL}", "--conn-description=new1 description", ], "Successfully added `conn_id`=new1 : postgresql://airflow:airflow@host:5432/airflow", @@ -536,7 +536,7 @@ class TestCliAddConnections(unittest.TestCase): "connections", "add", "new2", - "--conn-uri=%s" % TEST_URL, + f"--conn-uri={TEST_URL}", "--conn-extra", "{'extra': 'yes'}", ], @@ -557,7 +557,7 @@ class TestCliAddConnections(unittest.TestCase): "connections", "add", "new3", - "--conn-uri=%s" % TEST_URL, + f"--conn-uri={TEST_URL}", "--conn-extra", "{'extra': 'yes'}", "--conn-description", @@ -651,12 +651,12 @@ class TestCliAddConnections(unittest.TestCase): def test_cli_connections_add_duplicate(self): conn_id = "to_be_duplicated" connection_command.connections_add( - self.parser.parse_args(["connections", "add", conn_id, "--conn-uri=%s" % TEST_URL]) + self.parser.parse_args(["connections", "add", conn_id, f"--conn-uri={TEST_URL}"]) ) # Check for addition attempt with pytest.raises(SystemExit, match=rf"A connection with `conn_id`={conn_id} already exists"): connection_command.connections_add( - self.parser.parse_args(["connections", "add", conn_id, "--conn-uri=%s" % TEST_URL]) + self.parser.parse_args(["connections", "add", conn_id, f"--conn-uri={TEST_URL}"]) ) def test_cli_connections_add_delete_with_missing_parameters(self): @@ -671,7 +671,7 @@ class TestCliAddConnections(unittest.TestCase): # Attempt to add with invalid uri with pytest.raises(SystemExit, match=r"The URI provided to --conn-uri is invalid: nonsense_uri"): connection_command.connections_add( - self.parser.parse_args(["connections", "add", "new1", "--conn-uri=%s" % "nonsense_uri"]) + self.parser.parse_args(["connections", "add", "new1", f"--conn-uri={'nonsense_uri'}"]) ) diff --git a/tests/core/test_core.py b/tests/core/test_core.py index fae2c73..f6dd6ee 100644 --- a/tests/core/test_core.py +++ b/tests/core/test_core.py @@ -172,7 +172,7 @@ class TestCore(unittest.TestCase): op = BashOperator( task_id='test_bash_operator_kill', execution_timeout=timedelta(seconds=1), - bash_command="/bin/bash -c 'sleep %s'" % sleep_time, + bash_command=f"/bin/bash -c 'sleep {sleep_time}'", dag=self.dag, ) with pytest.raises(AirflowTaskTimeout): diff --git a/tests/dags/test_subdag.py b/tests/dags/test_subdag.py index e45b2a1..f2227dc 100644 --- a/tests/dags/test_subdag.py +++ b/tests/dags/test_subdag.py @@ -48,7 +48,7 @@ def subdag(parent_dag_name, child_dag_name, args): for i in range(2): DummyOperator( - task_id='{}-task-{}'.format(child_dag_name, i + 1), + task_id=f'{child_dag_name}-task-{i + 1}', default_args=args, dag=dag_subdag, ) diff --git a/tests/dags_corrupted/test_impersonation_custom.py b/tests/dags_corrupted/test_impersonation_custom.py index 77ea1ed..4d3b978 100644 --- a/tests/dags_corrupted/test_impersonation_custom.py +++ b/tests/dags_corrupted/test_impersonation_custom.py @@ -39,7 +39,7 @@ dag = DAG(dag_id='impersonation_with_custom_pkg', default_args=args) def print_today(): date_time = FakeDatetime.utcnow() - print('Today is {}'.format(date_time.strftime('%Y-%m-%d'))) + print(f"Today is {date_time.strftime('%Y-%m-%d')}") def check_hive_conf(): diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py index 944fa49..44edc47 100644 --- a/tests/executors/test_celery_executor.py +++ b/tests/executors/test_celery_executor.py @@ -391,7 +391,7 @@ class ClassWithCustomAttributes: setattr(self, key, value) def __str__(self): - return "{}({})".format(ClassWithCustomAttributes.__name__, str(self.__dict__)) + return f"{ClassWithCustomAttributes.__name__}({str(self.__dict__)})" def __repr__(self): return self.__str__() diff --git a/tests/hooks/test_dbapi.py b/tests/hooks/test_dbapi.py index 2cc916d..0f6c55a 100644 --- a/tests/hooks/test_dbapi.py +++ b/tests/hooks/test_dbapi.py @@ -125,7 +125,7 @@ class TestDbApiHook(unittest.TestCase): commit_count = 2 # The first and last commit assert commit_count == self.conn.commit.call_count - sql = "INSERT INTO {} ({}) VALUES (%s)".format(table, target_fields[0]) + sql = f"INSERT INTO {table} ({target_fields[0]}) VALUES (%s)" for row in rows: self.cur.execute.assert_any_call(sql, row) diff --git a/tests/models/test_baseoperator.py b/tests/models/test_baseoperator.py index 95607a6..f942b89 100644 --- a/tests/models/test_baseoperator.py +++ b/tests/models/test_baseoperator.py @@ -42,7 +42,7 @@ class ClassWithCustomAttributes: setattr(self, key, value) def __str__(self): - return "{}({})".format(ClassWithCustomAttributes.__name__, str(self.__dict__)) + return f"{ClassWithCustomAttributes.__name__}({str(self.__dict__)})" def __repr__(self): return self.__str__() @@ -154,7 +154,7 @@ class TestBaseOperator(unittest.TestCase): ({"user_defined_macros": {"foo": "bar"}}, "{{ foo }}", {}, "bar"), ({"user_defined_macros": {"foo": "bar"}}, 1, {}, 1), ( - {"user_defined_filters": {"hello": lambda name: "Hello %s" % name}}, + {"user_defined_filters": {"hello": lambda name: f"Hello {name}"}}, "{{ 'world' | hello }}", {}, "Hello world", diff --git a/tests/models/test_connection.py b/tests/models/test_connection.py index a96b89a..526d029 100644 --- a/tests/models/test_connection.py +++ b/tests/models/test_connection.py @@ -55,7 +55,7 @@ class UriTestCaseConfig: @staticmethod def uri_test_name(func, num, param): - return "{}_{}_{}".format(func.__name__, num, param.args[0].description.replace(' ', '_')) + return f"{func.__name__}_{num}_{param.args[0].description.replace(' ', '_')}" class TestConnection(unittest.TestCase): diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 60171d8..123c119 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -476,7 +476,7 @@ class TestDag(unittest.TestCase): def test_user_defined_filters(self): def jinja_udf(name): - return 'Hello %s' % name + return f'Hello {name}' dag = models.DAG('test-dag', start_date=DEFAULT_DATE, user_defined_filters={"hello": jinja_udf}) jinja_env = dag.get_template_env() @@ -1540,7 +1540,7 @@ class TestDag(unittest.TestCase): ) for i in range(2): - DummyOperator(task_id='{}-task-{}'.format(child_dag_name, i + 1), dag=dag_subdag) + DummyOperator(task_id=f'{child_dag_name}-task-{i + 1}', dag=dag_subdag) return dag_subdag diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py index 6c6b1cb..ce11892 100644 --- a/tests/models/test_dagbag.py +++ b/tests/models/test_dagbag.py @@ -105,7 +105,7 @@ class TestDagBag(unittest.TestCase): dagbag = models.DagBag(include_examples=False, safe_mode=True) assert len(dagbag.dagbag_stats) == 1 - assert dagbag.dagbag_stats[0].file == "/{}".format(os.path.basename(f.name)) + assert dagbag.dagbag_stats[0].file == f"/{os.path.basename(f.name)}" def test_safe_mode_heuristic_mismatch(self): """With safe mode enabled, a file not matching the discovery heuristics @@ -122,7 +122,7 @@ class TestDagBag(unittest.TestCase): with conf_vars({('core', 'dags_folder'): self.empty_dir}): dagbag = models.DagBag(include_examples=False, safe_mode=False) assert len(dagbag.dagbag_stats) == 1 - assert dagbag.dagbag_stats[0].file == "/{}".format(os.path.basename(f.name)) + assert dagbag.dagbag_stats[0].file == f"/{os.path.basename(f.name)}" def test_process_file_that_contains_multi_bytes_char(self): """ @@ -298,7 +298,7 @@ class TestDagBag(unittest.TestCase): actual_found_dag_ids = list(map(lambda dag: dag.dag_id, actual_found_dags)) for dag_id in expected_dag_ids: - actual_dagbag.log.info('validating %s' % dag_id) + actual_dagbag.log.info(f'validating {dag_id}') assert ( dag_id in actual_found_dag_ids ) == should_be_found, 'dag "{}" should {}have been found after processing dag "{}"'.format( diff --git a/tests/models/test_renderedtifields.py b/tests/models/test_renderedtifields.py index 1cf4e3f..f754753 100644 --- a/tests/models/test_renderedtifields.py +++ b/tests/models/test_renderedtifields.py @@ -50,7 +50,7 @@ class ClassWithCustomAttributes: setattr(self, key, value) def __str__(self): - return "{}({})".format(ClassWithCustomAttributes.__name__, str(self.__dict__)) + return f"{ClassWithCustomAttributes.__name__}({str(self.__dict__)})" def __repr__(self): return self.__str__() diff --git a/tests/providers/amazon/aws/hooks/test_batch_waiters.py b/tests/providers/amazon/aws/hooks/test_batch_waiters.py index 51d42e0..b852c2e 100644 --- a/tests/providers/amazon/aws/hooks/test_batch_waiters.py +++ b/tests/providers/amazon/aws/hooks/test_batch_waiters.py @@ -198,7 +198,7 @@ def batch_infrastructure( assert resp["jobDefinitionArn"] job_definition_arn = resp["jobDefinitionArn"] assert resp["revision"] - assert resp["jobDefinitionArn"].endswith("{}:{}".format(resp["jobDefinitionName"], resp["revision"])) + assert resp["jobDefinitionArn"].endswith(f"{resp['jobDefinitionName']}:{resp['revision']}") infrastructure.vpc_id = vpc_id infrastructure.subnet_id = subnet_id diff --git a/tests/providers/amazon/aws/hooks/test_s3.py b/tests/providers/amazon/aws/hooks/test_s3.py index d962068..fc00d9d 100644 --- a/tests/providers/amazon/aws/hooks/test_s3.py +++ b/tests/providers/amazon/aws/hooks/test_s3.py @@ -118,8 +118,8 @@ class TestAwsS3Hook: bucket = hook.get_bucket(s3_bucket) # we don't need to test the paginator that's covered by boto tests - keys = ["%s/b" % i for i in range(2)] - dirs = ["%s/" % i for i in range(2)] + keys = [f"{i}/b" for i in range(2)] + dirs = [f"{i}/" for i in range(2)] for key in keys: bucket.put_object(Key=key, Body=b'a') diff --git a/tests/providers/apache/hive/operators/test_hive_stats.py b/tests/providers/apache/hive/operators/test_hive_stats.py index d38e007..02dbdd8 100644 --- a/tests/providers/apache/hive/operators/test_hive_stats.py +++ b/tests/providers/apache/hive/operators/test_hive_stats.py @@ -274,17 +274,13 @@ class TestHiveStatsCollectionOperator(TestHiveEnvironment): hive_stats_collection_operator = HiveStatsCollectionOperator(**self.kwargs) hive_stats_collection_operator.execute(context={}) - sql = """ + sql = f""" DELETE FROM hive_stats WHERE - table_name='{}' AND - partition_repr='{}' AND - dttm='{}'; - """.format( - hive_stats_collection_operator.table, - mock_json_dumps.return_value, - hive_stats_collection_operator.dttm, - ) + table_name='{hive_stats_collection_operator.table}' AND + partition_repr='{mock_json_dumps.return_value}' AND + dttm='{hive_stats_collection_operator.dttm}'; + """ mock_mysql_hook.return_value.run.assert_called_once_with(sql) @unittest.skipIf( diff --git a/tests/providers/apache/hive/transfers/test_mysql_to_hive.py b/tests/providers/apache/hive/transfers/test_mysql_to_hive.py index c6f7736..0413b5c 100644 --- a/tests/providers/apache/hive/transfers/test_mysql_to_hive.py +++ b/tests/providers/apache/hive/transfers/test_mysql_to_hive.py @@ -317,8 +317,8 @@ class TestTransfer(unittest.TestCase): with hook.get_conn() as conn: conn.execute(f"DROP TABLE IF EXISTS {mysql_table}") conn.execute( - """ - CREATE TABLE {} ( + f""" + CREATE TABLE {mysql_table} ( c0 TINYINT, c1 SMALLINT, c2 MEDIUMINT, @@ -326,9 +326,7 @@ class TestTransfer(unittest.TestCase): c4 BIGINT, c5 TIMESTAMP ) - """.format( - mysql_table - ) + """ ) op = MySqlToHiveOperator( @@ -368,14 +366,12 @@ class TestTransfer(unittest.TestCase): with hook.get_conn() as conn: conn.execute(f"DROP TABLE IF EXISTS {mysql_table}") conn.execute( - """ - CREATE TABLE {} ( + f""" + CREATE TABLE {mysql_table} ( c0 VARCHAR(25), c1 VARCHAR(25) ) - """.format( - mysql_table - ) + """ ) conn.execute( """ @@ -475,8 +471,8 @@ class TestTransfer(unittest.TestCase): with hook.get_conn() as conn: conn.execute(f"DROP TABLE IF EXISTS {mysql_table}") conn.execute( - """ - CREATE TABLE {} ( + f""" + CREATE TABLE {mysql_table} ( c0 TINYINT UNSIGNED, c1 SMALLINT UNSIGNED, c2 MEDIUMINT UNSIGNED, @@ -488,9 +484,7 @@ class TestTransfer(unittest.TestCase): c8 INT, c9 BIGINT ) - """.format( - mysql_table - ) + """ ) conn.execute( """ diff --git a/tests/providers/apache/spark/hooks/test_spark_sql.py b/tests/providers/apache/spark/hooks/test_spark_sql.py index 85a5159..35e4330 100644 --- a/tests/providers/apache/spark/hooks/test_spark_sql.py +++ b/tests/providers/apache/spark/hooks/test_spark_sql.py @@ -60,11 +60,11 @@ class TestSparkSqlHook(unittest.TestCase): cmd = ' '.join(hook._prepare_command("")) # Check all the parameters - assert "--executor-cores {}".format(self._config['executor_cores']) in cmd - assert "--executor-memory {}".format(self._config['executor_memory']) in cmd - assert "--keytab {}".format(self._config['keytab']) in cmd - assert "--name {}".format(self._config['name']) in cmd - assert "--num-executors {}".format(self._config['num_executors']) in cmd + assert f"--executor-cores {self._config['executor_cores']}" in cmd + assert f"--executor-memory {self._config['executor_memory']}" in cmd + assert f"--keytab {self._config['keytab']}" in cmd + assert f"--name {self._config['name']}" in cmd + assert f"--num-executors {self._config['num_executors']}" in cmd sql_path = get_after('-f', hook._prepare_command("")) assert self._config['sql'].strip() == sql_path diff --git a/tests/providers/apache/sqoop/hooks/test_sqoop.py b/tests/providers/apache/sqoop/hooks/test_sqoop.py index 332021a..08926d4 100644 --- a/tests/providers/apache/sqoop/hooks/test_sqoop.py +++ b/tests/providers/apache/sqoop/hooks/test_sqoop.py @@ -179,29 +179,29 @@ class TestSqoopHook(unittest.TestCase): # Check if the config has been extracted from the json if self._config_json['namenode']: - assert "-fs {}".format(self._config_json['namenode']) in cmd + assert f"-fs {self._config_json['namenode']}" in cmd if self._config_json['job_tracker']: - assert "-jt {}".format(self._config_json['job_tracker']) in cmd + assert f"-jt {self._config_json['job_tracker']}" in cmd if self._config_json['libjars']: - assert "-libjars {}".format(self._config_json['libjars']) in cmd + assert f"-libjars {self._config_json['libjars']}" in cmd if self._config_json['files']: - assert "-files {}".format(self._config_json['files']) in cmd + assert f"-files {self._config_json['files']}" in cmd if self._config_json['archives']: - assert "-archives {}".format(self._config_json['archives']) in cmd + assert f"-archives {self._config_json['archives']}" in cmd - assert "--hcatalog-database {}".format(self._config['hcatalog_database']) in cmd - assert "--hcatalog-table {}".format(self._config['hcatalog_table']) in cmd + assert f"--hcatalog-database {self._config['hcatalog_database']}" in cmd + assert f"--hcatalog-table {self._config['hcatalog_table']}" in cmd # Check the regulator stuff passed by the default constructor if self._config['verbose']: assert "--verbose" in cmd if self._config['num_mappers']: - assert "--num-mappers {}".format(self._config['num_mappers']) in cmd + assert f"--num-mappers {self._config['num_mappers']}" in cmd for key, value in self._config['properties'].items(): assert f"-D {key}={value}" in cmd @@ -243,21 +243,14 @@ class TestSqoopHook(unittest.TestCase): ) ) - assert "--input-null-string {}".format(self._config_export['input_null_string']) in cmd - assert "--input-null-non-string {}".format(self._config_export['input_null_non_string']) in cmd - assert "--staging-table {}".format(self._config_export['staging_table']) in cmd - assert "--enclosed-by {}".format(self._config_export['enclosed_by']) in cmd - assert "--escaped-by {}".format(self._config_export['escaped_by']) in cmd - assert ( - "--input-fields-terminated-by {}".format(self._config_export['input_fields_terminated_by']) in cmd - ) - assert ( - "--input-lines-terminated-by {}".format(self._config_export['input_lines_terminated_by']) in cmd - ) - assert ( - "--input-optionally-enclosed-by {}".format(self._config_export['input_optionally_enclosed_by']) - in cmd - ) + assert f"--input-null-string {self._config_export['input_null_string']}" in cmd + assert f"--input-null-non-string {self._config_export['input_null_non_string']}" in cmd + assert f"--staging-table {self._config_export['staging_table']}" in cmd + assert f"--enclosed-by {self._config_export['enclosed_by']}" in cmd + assert f"--escaped-by {self._config_export['escaped_by']}" in cmd + assert f"--input-fields-terminated-by {self._config_export['input_fields_terminated_by']}" in cmd + assert f"--input-lines-terminated-by {self._config_export['input_lines_terminated_by']}" in cmd + assert f"--input-optionally-enclosed-by {self._config_export['input_optionally_enclosed_by']}" in cmd # these options are from the extra export options assert "--update-key id" in cmd assert "--update-mode allowinsert" in cmd @@ -301,10 +294,10 @@ class TestSqoopHook(unittest.TestCase): if self._config_import['direct']: assert '--direct' in cmd - assert '--target-dir {}'.format(self._config_import['target_dir']) in cmd + assert f"--target-dir {self._config_import['target_dir']}" in cmd - assert '--driver {}'.format(self._config_import['driver']) in cmd - assert '--split-by {}'.format(self._config_import['split_by']) in cmd + assert f"--driver {self._config_import['driver']}" in cmd + assert f"--split-by {self._config_import['split_by']}" in cmd # these are from extra options, but not passed to this cmd import command assert '--show' not in cmd assert 'hcatalog-storage-stanza \"stored as orcfile\"' not in cmd diff --git a/tests/providers/elasticsearch/log/elasticmock/__init__.py b/tests/providers/elasticsearch/log/elasticmock/__init__.py index 2490dbe..c2a9080 100644 --- a/tests/providers/elasticsearch/log/elasticmock/__init__.py +++ b/tests/providers/elasticsearch/log/elasticmock/__init__.py @@ -51,7 +51,7 @@ ELASTIC_INSTANCES = {} # type: Dict[str, FakeElasticsearch] def _get_elasticmock(hosts=None, *args, **kwargs): # pylint: disable=unused-argument host = _normalize_hosts(hosts)[0] - elastic_key = '{}:{}'.format(host.get('host', 'localhost'), host.get('port', 9200)) + elastic_key = f"{host.get('host', 'localhost')}:{host.get('port', 9200)}" if elastic_key in ELASTIC_INSTANCES: connection = ELASTIC_INSTANCES.get(elastic_key) diff --git a/tests/providers/google/cloud/hooks/test_cloud_sql.py b/tests/providers/google/cloud/hooks/test_cloud_sql.py index 003245d..9e9ec70 100644 --- a/tests/providers/google/cloud/hooks/test_cloud_sql.py +++ b/tests/providers/google/cloud/hooks/test_cloud_sql.py @@ -1061,9 +1061,7 @@ class TestCloudSqlDatabaseQueryHook(unittest.TestCase): project = self.sql_connection.extra_dejson['project_id'] location = self.sql_connection.extra_dejson['location'] instance = self.sql_connection.extra_dejson['instance'] - instance_spec = "{project}:{location}:{instance}".format( - project=project, location=location, instance=instance - ) + instance_spec = f"{project}:{location}:{instance}" assert sqlproxy_runner.instance_specification == instance_spec @mock.patch("airflow.providers.google.cloud.hooks.cloud_sql.CloudSQLDatabaseHook.get_connection") diff --git a/tests/providers/google/cloud/hooks/test_pubsub.py b/tests/providers/google/cloud/hooks/test_pubsub.py index 0841806..4086526 100644 --- a/tests/providers/google/cloud/hooks/test_pubsub.py +++ b/tests/providers/google/cloud/hooks/test_pubsub.py @@ -130,17 +130,17 @@ class TestPubSubHook(unittest.TestCase): @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn')) def test_delete_nonexisting_topic_failifnotexists(self, mock_service): mock_service.return_value.delete_topic.side_effect = NotFound( - 'Topic does not exists: %s' % EXPANDED_TOPIC + f'Topic does not exists: {EXPANDED_TOPIC}' ) with pytest.raises(PubSubException) as ctx: self.pubsub_hook.delete_topic(project_id=TEST_PROJECT, topic=TEST_TOPIC, fail_if_not_exists=True) - assert str(ctx.value) == 'Topic does not exist: %s' % EXPANDED_TOPIC + assert str(ctx.value) == f'Topic does not exist: {EXPANDED_TOPIC}' @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn')) def test_delete_topic_api_call_error(self, mock_service): mock_service.return_value.delete_topic.side_effect = GoogleAPICallError( - 'Error deleting topic: %s' % EXPANDED_TOPIC + f'Error deleting topic: {EXPANDED_TOPIC}' ) with pytest.raises(PubSubException): self.pubsub_hook.delete_topic(project_id=TEST_PROJECT, topic=TEST_TOPIC, fail_if_not_exists=True) @@ -148,23 +148,23 @@ class TestPubSubHook(unittest.TestCase): @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn')) def test_create_preexisting_topic_failifexists(self, mock_service): mock_service.return_value.create_topic.side_effect = AlreadyExists( - 'Topic already exists: %s' % TEST_TOPIC + f'Topic already exists: {TEST_TOPIC}' ) with pytest.raises(PubSubException) as ctx: self.pubsub_hook.create_topic(project_id=TEST_PROJECT, topic=TEST_TOPIC, fail_if_exists=True) - assert str(ctx.value) == 'Topic already exists: %s' % TEST_TOPIC + assert str(ctx.value) == f'Topic already exists: {TEST_TOPIC}' @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn')) def test_create_preexisting_topic_nofailifexists(self, mock_service): mock_service.return_value.create_topic.side_effect = AlreadyExists( - 'Topic already exists: %s' % EXPANDED_TOPIC + f'Topic already exists: {EXPANDED_TOPIC}' ) self.pubsub_hook.create_topic(project_id=TEST_PROJECT, topic=TEST_TOPIC) @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn')) def test_create_topic_api_call_error(self, mock_service): mock_service.return_value.create_topic.side_effect = GoogleAPICallError( - 'Error creating topic: %s' % TEST_TOPIC + f'Error creating topic: {TEST_TOPIC}' ) with pytest.raises(PubSubException): self.pubsub_hook.create_topic(project_id=TEST_PROJECT, topic=TEST_TOPIC, fail_if_exists=True) @@ -238,18 +238,18 @@ class TestPubSubHook(unittest.TestCase): @mock.patch(PUBSUB_STRING.format('PubSubHook.subscriber_client')) def test_delete_nonexisting_subscription_failifnotexists(self, mock_service): mock_service.delete_subscription.side_effect = NotFound( - 'Subscription does not exists: %s' % EXPANDED_SUBSCRIPTION + f'Subscription does not exists: {EXPANDED_SUBSCRIPTION}' ) with pytest.raises(PubSubException) as ctx: self.pubsub_hook.delete_subscription( project_id=TEST_PROJECT, subscription=TEST_SUBSCRIPTION, fail_if_not_exists=True ) - assert str(ctx.value) == 'Subscription does not exist: %s' % EXPANDED_SUBSCRIPTION + assert str(ctx.value) == f'Subscription does not exist: {EXPANDED_SUBSCRIPTION}' @mock.patch(PUBSUB_STRING.format('PubSubHook.subscriber_client')) def test_delete_subscription_api_call_error(self, mock_service): mock_service.delete_subscription.side_effect = GoogleAPICallError( - 'Error deleting subscription %s' % EXPANDED_SUBSCRIPTION + f'Error deleting subscription {EXPANDED_SUBSCRIPTION}' ) with pytest.raises(PubSubException): self.pubsub_hook.delete_subscription( @@ -262,7 +262,7 @@ class TestPubSubHook(unittest.TestCase): self, mock_uuid, mock_service ): # noqa # pylint: disable=unused-argument,line-too-long create_method = mock_service.create_subscription - expected_name = EXPANDED_SUBSCRIPTION.replace(TEST_SUBSCRIPTION, 'sub-%s' % TEST_UUID) + expected_name = EXPANDED_SUBSCRIPTION.replace(TEST_SUBSCRIPTION, f'sub-{TEST_UUID}') response = self.pubsub_hook.create_subscription(project_id=TEST_PROJECT, topic=TEST_TOPIC) create_method.assert_called_once_with( @@ -282,7 +282,7 @@ class TestPubSubHook(unittest.TestCase): timeout=None, metadata=None, ) - assert 'sub-%s' % TEST_UUID == response + assert f'sub-{TEST_UUID}' == response @mock.patch(PUBSUB_STRING.format('PubSubHook.subscriber_client')) def test_create_subscription_with_ack_deadline(self, mock_service): @@ -342,18 +342,18 @@ class TestPubSubHook(unittest.TestCase): @mock.patch(PUBSUB_STRING.format('PubSubHook.subscriber_client')) def test_create_subscription_failifexists(self, mock_service): mock_service.create_subscription.side_effect = AlreadyExists( - 'Subscription already exists: %s' % EXPANDED_SUBSCRIPTION + f'Subscription already exists: {EXPANDED_SUBSCRIPTION}' ) with pytest.raises(PubSubException) as ctx: self.pubsub_hook.create_subscription( project_id=TEST_PROJECT, topic=TEST_TOPIC, subscription=TEST_SUBSCRIPTION, fail_if_exists=True ) - assert str(ctx.value) == 'Subscription already exists: %s' % EXPANDED_SUBSCRIPTION + assert str(ctx.value) == f'Subscription already exists: {EXPANDED_SUBSCRIPTION}' @mock.patch(PUBSUB_STRING.format('PubSubHook.subscriber_client')) def test_create_subscription_api_call_error(self, mock_service): mock_service.create_subscription.side_effect = GoogleAPICallError( - 'Error creating subscription %s' % EXPANDED_SUBSCRIPTION + f'Error creating subscription {EXPANDED_SUBSCRIPTION}' ) with pytest.raises(PubSubException): self.pubsub_hook.create_subscription( @@ -363,7 +363,7 @@ class TestPubSubHook(unittest.TestCase): @mock.patch(PUBSUB_STRING.format('PubSubHook.subscriber_client')) def test_create_subscription_nofailifexists(self, mock_service): mock_service.create_subscription.side_effect = AlreadyExists( - 'Subscription already exists: %s' % EXPANDED_SUBSCRIPTION + f'Subscription already exists: {EXPANDED_SUBSCRIPTION}' ) response = self.pubsub_hook.create_subscription( project_id=TEST_PROJECT, topic=TEST_TOPIC, subscription=TEST_SUBSCRIPTION diff --git a/tests/providers/google/cloud/operators/test_dataflow.py b/tests/providers/google/cloud/operators/test_dataflow.py index 7e290d7..c682a31 100644 --- a/tests/providers/google/cloud/operators/test_dataflow.py +++ b/tests/providers/google/cloud/operators/test_dataflow.py @@ -55,7 +55,7 @@ DEFAULT_OPTIONS_TEMPLATE = { 'zone': 'us-central1-f', } ADDITIONAL_OPTIONS = {'output': 'gs://test/output', 'labels': {'foo': 'bar'}} -TEST_VERSION = 'v{}'.format(version.replace('.', '-').replace('+', '-')) +TEST_VERSION = f"v{version.replace('.', '-').replace('+', '-')}" EXPECTED_ADDITIONAL_OPTIONS = { 'output': 'gs://test/output', 'labels': {'foo': 'bar', 'airflow-version': TEST_VERSION}, diff --git a/tests/providers/google/cloud/operators/test_mlengine_utils.py b/tests/providers/google/cloud/operators/test_mlengine_utils.py index 539ee60..65b41b6 100644 --- a/tests/providers/google/cloud/operators/test_mlengine_utils.py +++ b/tests/providers/google/cloud/operators/test_mlengine_utils.py @@ -28,7 +28,7 @@ from airflow.providers.google.cloud.utils import mlengine_operator_utils from airflow.version import version DEFAULT_DATE = datetime.datetime(2017, 6, 6) -TEST_VERSION = 'v{}'.format(version.replace('.', '-').replace('+', '-')) +TEST_VERSION = f"v{version.replace('.', '-').replace('+', '-')}" class TestCreateEvaluateOps(unittest.TestCase): @@ -80,7 +80,7 @@ class TestCreateEvaluateOps(unittest.TestCase): input_paths=input_with_model['inputPaths'], prediction_path=input_with_model['outputPath'], metric_fn_and_keys=(self.metric_fn, ['err']), - validate_fn=(lambda x: 'err=%.1f' % x['err']), + validate_fn=(lambda x: f"err={x['err']:.1f}"), dag=self.dag, py_interpreter="python3", ) @@ -168,7 +168,7 @@ class TestCreateEvaluateOps(unittest.TestCase): 'input_paths': input_with_model['inputPaths'], 'prediction_path': input_with_model['outputPath'], 'metric_fn_and_keys': (self.metric_fn, ['err']), - 'validate_fn': (lambda x: 'err=%.1f' % x['err']), + 'validate_fn': (lambda x: f"err={x['err']:.1f}"), } with pytest.raises(AirflowException, match='Missing model origin'): diff --git a/tests/providers/google/cloud/sensors/test_gcs.py b/tests/providers/google/cloud/sensors/test_gcs.py index e3c8917..77e0c35 100644 --- a/tests/providers/google/cloud/sensors/test_gcs.py +++ b/tests/providers/google/cloud/sensors/test_gcs.py @@ -175,7 +175,7 @@ class TestGoogleCloudStoragePrefixSensor(TestCase): impersonation_chain=TEST_IMPERSONATION_CHAIN, poke_interval=0, ) - generated_messages = ['test-prefix/obj%s' % i for i in range(5)] + generated_messages = [f'test-prefix/obj{i}' for i in range(5)] mock_hook.return_value.list.return_value = generated_messages response = task.execute(None) diff --git a/tests/providers/google/cloud/transfers/test_bigquery_to_bigquery.py b/tests/providers/google/cloud/transfers/test_bigquery_to_bigquery.py index 1621829..b3c0acd 100644 --- a/tests/providers/google/cloud/transfers/test_bigquery_to_bigquery.py +++ b/tests/providers/google/cloud/transfers/test_bigquery_to_bigquery.py @@ -30,7 +30,7 @@ class TestBigQueryToBigQueryOperator(unittest.TestCase): @mock.patch('airflow.providers.google.cloud.transfers.bigquery_to_bigquery.BigQueryHook') def test_execute(self, mock_hook): source_project_dataset_tables = f'{TEST_DATASET}.{TEST_TABLE_ID}' - destination_project_dataset_table = '{}.{}'.format(TEST_DATASET + '_new', TEST_TABLE_ID) + destination_project_dataset_table = f"{TEST_DATASET + '_new'}.{TEST_TABLE_ID}" write_disposition = 'WRITE_EMPTY' create_disposition = 'CREATE_IF_NEEDED' labels = {'k1': 'v1'} diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py index 5c0c38c..2df9fea 100644 --- a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py +++ b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py @@ -143,7 +143,7 @@ class TestGoogleCloudStorageToCloudStorageOperator(unittest.TestCase): source_bucket=TEST_BUCKET, source_object=SOURCE_OBJECT_WILDCARD_FILENAME, destination_bucket=DESTINATION_BUCKET, - destination_object='{}/{}'.format(DESTINATION_OBJECT_PREFIX, SOURCE_OBJECT_WILDCARD_SUFFIX[:-1]), + destination_object=f'{DESTINATION_OBJECT_PREFIX}/{SOURCE_OBJECT_WILDCARD_SUFFIX[:-1]}', ) operator.execute(None) diff --git a/tests/providers/google/cloud/utils/test_mlengine_operator_utils.py b/tests/providers/google/cloud/utils/test_mlengine_operator_utils.py index 289eb43..5e19c17 100644 --- a/tests/providers/google/cloud/utils/test_mlengine_operator_utils.py +++ b/tests/providers/google/cloud/utils/test_mlengine_operator_utils.py @@ -83,11 +83,11 @@ METRIC_KEYS_EXPECTED = ','.join(METRIC_KEYS) def validate_err_and_count(summary): if summary['err'] > 0.2: - raise ValueError('Too high err>0.2; summary=%s' % summary) + raise ValueError(f'Too high err>0.2; summary={summary}') if summary['mse'] > 0.05: - raise ValueError('Too high mse>0.05; summary=%s' % summary) + raise ValueError(f'Too high mse>0.05; summary={summary}') if summary['count'] < 1000: - raise ValueError('Too few instances<1000; summary=%s' % summary) + raise ValueError(f'Too few instances<1000; summary={summary}') return summary diff --git a/tests/providers/mysql/hooks/test_mysql.py b/tests/providers/mysql/hooks/test_mysql.py index 538381f..19f0bd8 100644 --- a/tests/providers/mysql/hooks/test_mysql.py +++ b/tests/providers/mysql/hooks/test_mysql.py @@ -431,10 +431,8 @@ class TestMySql(unittest.TestCase): from tests.test_utils.asserts import assert_equal_ignore_multiple_spaces assert mock_execute.call_count == 1 - query = """ + query = f""" SELECT * INTO OUTFILE '{tmp_file}' FROM {table} - """.format( - tmp_file=tmp_file, table=table - ) + """ assert_equal_ignore_multiple_spaces(self, mock_execute.call_args[0][0], query) diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index 2046e22..9c75513 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -197,7 +197,7 @@ def make_user_defined_macro_filter_dag(): user_defined_macros={ 'next_execution_date': compute_next_execution_date, }, - user_defined_filters={'hello': lambda name: 'Hello %s' % name}, + user_defined_filters={'hello': lambda name: f'Hello {name}'}, catchup=False, ) BashOperator( @@ -731,7 +731,7 @@ class TestStringifiedDAGs(unittest.TestCase): setattr(self, key, value) def __str__(self): - return "{}({})".format(self.__class__.__name__, str(self.__dict__)) + return f"{self.__class__.__name__}({str(self.__dict__)})" def __repr__(self): return self.__str__() diff --git a/tests/test_utils/gcp_system_helpers.py b/tests/test_utils/gcp_system_helpers.py index 6572111..314f09a 100644 --- a/tests/test_utils/gcp_system_helpers.py +++ b/tests/test_utils/gcp_system_helpers.py @@ -184,7 +184,7 @@ class GoogleSystemTest(SystemTest): "gsutil", "iam", "ch", - "serviceAccount:%s:admin" % account_email, + f"serviceAccount:{account_email}:admin", bucket_name, ] ) diff --git a/tests/test_utils/logging_command_executor.py b/tests/test_utils/logging_command_executor.py index 1ebf729..5fca244 100644 --- a/tests/test_utils/logging_command_executor.py +++ b/tests/test_utils/logging_command_executor.py @@ -57,7 +57,7 @@ class LoggingCommandExecutor(LoggingMixin): self.log.info("Stdout: %s", output) self.log.info("Stderr: %s", err) raise AirflowException( - "Retcode {} on {} with stdout: {}, stderr: {}".format(retcode, " ".join(cmd), output, err) + f"Retcode {retcode} on {' '.join(cmd)} with stdout: {output}, stderr: {err}" ) return output diff --git a/tests/test_utils/mock_operators.py b/tests/test_utils/mock_operators.py index 534770e..989b984 100644 --- a/tests/test_utils/mock_operators.py +++ b/tests/test_utils/mock_operators.py @@ -82,7 +82,7 @@ class CustomBaseIndexOpLink(BaseOperatorLink): @property def name(self) -> str: - return 'BigQuery Console #{index}'.format(index=self.index + 1) + return f'BigQuery Console #{self.index + 1}' def get_link(self, operator, dttm): ti = TaskInstance(task=operator, execution_date=dttm) diff --git a/tests/utils/test_helpers.py b/tests/utils/test_helpers.py index fffa2d4..0477c01 100644 --- a/tests/utils/test_helpers.py +++ b/tests/utils/test_helpers.py @@ -43,9 +43,7 @@ class TestHelpers(unittest.TestCase): filename_template = "{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log" ts = ti.get_template_context()['ts'] - expected_filename = "{dag_id}/{task_id}/{ts}/{try_number}.log".format( - dag_id=dag_id, task_id=task_id, ts=ts, try_number=try_number - ) + expected_filename = f"{dag_id}/{task_id}/{ts}/{try_number}.log" rendered_filename = helpers.render_log_filename(ti, try_number, filename_template) diff --git a/tests/www/api/experimental/test_endpoints.py b/tests/www/api/experimental/test_endpoints.py index 5981eac..1a379d9 100644 --- a/tests/www/api/experimental/test_endpoints.py +++ b/tests/www/api/experimental/test_endpoints.py @@ -368,7 +368,7 @@ class TestPoolApiExperimental(TestBase): clear_db_pools() self.pools = [Pool.get_default_pool()] for i in range(self.USER_POOL_COUNT): - name = 'experimental_%s' % (i + 1) + name = f'experimental_{i + 1}' pool = Pool( pool=name, slots=i, diff --git a/tests/www/test_views.py b/tests/www/test_views.py index 5011547..929076b 100644 --- a/tests/www/test_views.py +++ b/tests/www/test_views.py @@ -757,7 +757,7 @@ class TestAirflowBaseViews(TestBase): url = 'dag_details?dag_id=test_tree_view' resp = self.client.get(url, follow_redirects=True) params = {'dag_id': 'test_tree_view', 'origin': '/tree?dag_id=test_tree_view'} - href = "/trigger?{}".format(html.escape(urllib.parse.urlencode(params))) + href = f"/trigger?{html.escape(urllib.parse.urlencode(params))}" self.check_content_in_response(href, resp) def test_dag_details_trigger_origin_graph_view(self): @@ -772,7 +772,7 @@ class TestAirflowBaseViews(TestBase): url = 'dag_details?dag_id=test_graph_view' resp = self.client.get(url, follow_redirects=True) params = {'dag_id': 'test_graph_view', 'origin': '/graph?dag_id=test_graph_view'} - href = "/trigger?{}".format(html.escape(urllib.parse.urlencode(params))) + href = f"/trigger?{html.escape(urllib.parse.urlencode(params))}" self.check_content_in_response(href, resp) def test_dag_details_subdag(self): @@ -1177,9 +1177,7 @@ class TestLogView(TestBase): DAG_ID_REMOVED = 'removed_dag_for_testing_log_view' TASK_ID = 'task_for_testing_log_view' DEFAULT_DATE = timezone.datetime(2017, 9, 1) - ENDPOINT = 'log?dag_id={dag_id}&task_id={task_id}&execution_date={execution_date}'.format( - dag_id=DAG_ID, task_id=TASK_ID, execution_date=DEFAULT_DATE - ) + ENDPOINT = f'log?dag_id={DAG_ID}&task_id={TASK_ID}&execution_date={DEFAULT_DATE}' def setUp(self): # Make sure that the configure_logging is not cached @@ -1277,7 +1275,7 @@ class TestLogView(TestBase): for num in range(1, expected_num_logs_visible + 1): assert f'log-group-{num}' in response.data.decode('utf-8') assert 'log-group-0' not in response.data.decode('utf-8') - assert 'log-group-{}'.format(expected_num_logs_visible + 1) not in response.data.decode('utf-8') + assert f'log-group-{expected_num_logs_visible + 1}' not in response.data.decode('utf-8') def test_get_logs_with_metadata_as_download_file(self): url_template = ( @@ -1540,7 +1538,7 @@ class ViewWithDateTimeAndNumRunsAndDagRunsFormTester: Should set base date to execution date. """ response = self.test.client.get( - self.endpoint + '&execution_date={}'.format(self.runs[1].execution_date.isoformat()), + self.endpoint + f'&execution_date={self.runs[1].execution_date.isoformat()}', data=dict(username='test', password='test'), follow_redirects=True, ) @@ -1563,7 +1561,7 @@ class ViewWithDateTimeAndNumRunsAndDagRunsFormTester: Should set base date and num runs to submitted values. """ response = self.test.client.get( - self.endpoint + '&base_date={}&num_runs=2'.format(self.runs[1].execution_date.isoformat()), + self.endpoint + f'&base_date={self.runs[1].execution_date.isoformat()}&num_runs=2', data=dict(username='test', password='test'), follow_redirects=True, ) @@ -2849,7 +2847,7 @@ class TestExtraLinks(TestBase): name = 'foo-bar' def get_link(self, operator, dttm): - return 'http://www.example.com/{}/{}/{}'.format(operator.task_id, 'foo-bar', dttm) + return f"http://www.example.com/{operator.task_id}/foo-bar/{dttm}" class AirflowLink(BaseOperatorLink): name = 'airflow'
