This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 84a3816767b0f3f78a0a188c60f5965c94aa9d46 Author: Jens Scheffler <[email protected]> AuthorDate: Fri Oct 31 20:39:08 2025 +0100 [v3-1-test] Enable ruff PLW0602 rule (apache#57588) (#57638) * Enable ruff PLW0602 rule (#57588) (cherry picked from commit d52c4fb417663803ab7e2cc26baa9d4234382b81) * Enable ruff PLW0129 rule (#57516) * Enable ruff PLW0129 rule * Fix pytests * Fix pytests as working in logs changed * Fix pytests as working in logs changed * [v3-1-test] Enable ruff PLW0602 rule (#57588) (cherry picked from commit d52c4fb417663803ab7e2cc26baa9d4234382b81) Co-authored-by: Jens Scheffler <[email protected]> --- airflow-core/src/airflow/api_fastapi/app.py | 2 -- airflow-core/src/airflow/configuration.py | 2 +- airflow-core/src/airflow/logging_config.py | 1 - airflow-core/src/airflow/plugins_manager.py | 13 ------------- .../tests/airflowctl_tests/conftest.py | 1 - .../prepare_providers/provider_documentation.py | 1 - .../aws/executors/batch/test_batch_executor.py | 5 ++++- .../tests/unit/apache/livy/operators/test_livy.py | 5 ++++- .../apache/spark/operators/test_spark_submit.py | 5 ++++- .../unit/cncf/kubernetes/operators/test_pod.py | 7 ++++++- .../unit/cncf/kubernetes/triggers/test_pod.py | 14 +++++++------- .../google/cloud/triggers/test_bigquery_dts.py | 7 +++++-- .../unit/google/cloud/triggers/test_dataflow.py | 4 ++-- .../unit/google/cloud/triggers/test_dataplex.py | 2 +- .../cloud/triggers/test_kubernetes_engine.py | 22 +++++++++++----------- .../tests/unit/standard/operators/test_python.py | 1 - pyproject.toml | 7 ++++++- .../check_providers_subpackages_all_have_init.py | 2 -- 18 files changed, 51 insertions(+), 50 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/app.py b/airflow-core/src/airflow/api_fastapi/app.py index 3e76c4f8f2d..58cfb157083 100644 --- a/airflow-core/src/airflow/api_fastapi/app.py +++ b/airflow-core/src/airflow/api_fastapi/app.py @@ -159,8 +159,6 @@ def init_auth_manager(app: FastAPI | None = None) -> BaseAuthManager: def get_auth_manager() -> BaseAuthManager: """Return the auth manager, provided it's been initialized before.""" - global auth_manager - if auth_manager is None: raise RuntimeError( "Auth Manager has not been initialized yet. " diff --git a/airflow-core/src/airflow/configuration.py b/airflow-core/src/airflow/configuration.py index 18b376ccba5..1bfbbcf62b4 100644 --- a/airflow-core/src/airflow/configuration.py +++ b/airflow-core/src/airflow/configuration.py @@ -1844,7 +1844,7 @@ class AirflowConfigParser(ConfigParser): """ # We need those globals before we run "get_all_expansion_variables" because this is where # the variables are expanded from in the configuration - global FERNET_KEY, AIRFLOW_HOME, JWT_SECRET_KEY + global FERNET_KEY, JWT_SECRET_KEY from cryptography.fernet import Fernet unit_test_config_file = pathlib.Path(__file__).parent / "config_templates" / "unit_tests.cfg" diff --git a/airflow-core/src/airflow/logging_config.py b/airflow-core/src/airflow/logging_config.py index 0875ac9be90..fb29b155c94 100644 --- a/airflow-core/src/airflow/logging_config.py +++ b/airflow-core/src/airflow/logging_config.py @@ -38,7 +38,6 @@ DEFAULT_REMOTE_CONN_ID: str | None = None def __getattr__(name: str): if name == "REMOTE_TASK_LOG": - global REMOTE_TASK_LOG load_logging_config() return REMOTE_TASK_LOG diff --git a/airflow-core/src/airflow/plugins_manager.py b/airflow-core/src/airflow/plugins_manager.py index aa228342c1d..5a70dbcd8f0 100644 --- a/airflow-core/src/airflow/plugins_manager.py +++ b/airflow-core/src/airflow/plugins_manager.py @@ -214,8 +214,6 @@ def is_valid_plugin(plugin_obj): :return: Whether or not the obj is a valid subclass of AirflowPlugin """ - global plugins - if ( inspect.isclass(plugin_obj) and issubclass(plugin_obj, AirflowPlugin) @@ -234,8 +232,6 @@ def register_plugin(plugin_instance): :param plugin_instance: subclass of AirflowPlugin """ - global plugins - if plugin_instance.name in loaded_plugins: return @@ -250,8 +246,6 @@ def load_entrypoint_plugins(): The entry_point group should be 'airflow.plugins'. """ - global import_errors - log.debug("Loading plugins from entrypoints") for entry_point, dist in entry_points_with_dist("airflow.plugins"): @@ -271,7 +265,6 @@ def load_entrypoint_plugins(): def load_plugins_from_plugin_directory(): """Load and register Airflow Plugins from plugins directory.""" - global import_errors log.debug("Loading plugins from directory: %s", settings.PLUGINS_FOLDER) files = find_path_from_directory(settings.PLUGINS_FOLDER, ".airflowignore") plugin_search_locations: list[tuple[str, Generator[str, None, None]]] = [("", files)] @@ -373,7 +366,6 @@ def ensure_plugins_loaded(): def initialize_ui_plugins(): """Collect extension points for the UI.""" - global plugins global external_views global react_apps @@ -456,7 +448,6 @@ def initialize_ui_plugins(): def initialize_flask_plugins(): """Collect flask extension points for WEB UI (legacy).""" - global plugins global flask_blueprints global flask_appbuilder_views global flask_appbuilder_menu_links @@ -496,7 +487,6 @@ def initialize_flask_plugins(): def initialize_fastapi_plugins(): """Collect extension points for the API.""" - global plugins global fastapi_apps global fastapi_root_middlewares @@ -593,7 +583,6 @@ def initialize_hook_lineage_readers_plugins(): def integrate_macros_plugins() -> None: """Integrates macro plugins.""" - global plugins global macros_modules from airflow.sdk.execution_time import macros @@ -626,8 +615,6 @@ def integrate_macros_plugins() -> None: def integrate_listener_plugins(listener_manager: ListenerManager) -> None: """Add listeners from plugins.""" - global plugins - ensure_plugins_loaded() if plugins: diff --git a/airflow-ctl-tests/tests/airflowctl_tests/conftest.py b/airflow-ctl-tests/tests/airflowctl_tests/conftest.py index 720403e3c64..13dacf5d8c5 100644 --- a/airflow-ctl-tests/tests/airflowctl_tests/conftest.py +++ b/airflow-ctl-tests/tests/airflowctl_tests/conftest.py @@ -182,7 +182,6 @@ def docker_compose_up(tmp_path_factory): def docker_compose_down(): """Tear down Docker Compose environment.""" - global docker_client if docker_client: docker_client.compose.down(remove_orphans=True, volumes=True, quiet=True) diff --git a/dev/breeze/src/airflow_breeze/prepare_providers/provider_documentation.py b/dev/breeze/src/airflow_breeze/prepare_providers/provider_documentation.py index a7bc466e7aa..e1d1a6ce85d 100644 --- a/dev/breeze/src/airflow_breeze/prepare_providers/provider_documentation.py +++ b/dev/breeze/src/airflow_breeze/prepare_providers/provider_documentation.py @@ -831,7 +831,6 @@ def update_release_notes( return with_breaking_changes, maybe_with_new_features, False change_table_len = len(list_of_list_of_changes[0]) table_iter = 0 - global SHORT_HASH_TO_TYPE_DICT type_of_current_package_changes: list[TypeOfChange] = [] while table_iter < change_table_len: get_console().print() diff --git a/providers/amazon/tests/unit/amazon/aws/executors/batch/test_batch_executor.py b/providers/amazon/tests/unit/amazon/aws/executors/batch/test_batch_executor.py index 1e8e08eccef..363b101faeb 100644 --- a/providers/amazon/tests/unit/amazon/aws/executors/batch/test_batch_executor.py +++ b/providers/amazon/tests/unit/amazon/aws/executors/batch/test_batch_executor.py @@ -461,7 +461,10 @@ class TestAwsBatchExecutor: mock_executor.attempt_submit_jobs() mock_executor.sync_running_jobs() for i in range(2): - assert f"Airflow task {airflow_keys[i]} has failed a maximum of {mock_executor.MAX_SUBMIT_JOB_ATTEMPTS} times. Marking as failed" + assert ( + f"Airflow task {airflow_keys[i]} has failed a maximum of {mock_executor.MAX_SUBMIT_JOB_ATTEMPTS} times. Marking as failed" + in caplog.text + ) @mock.patch("airflow.providers.amazon.aws.executors.batch.batch_executor.exponential_backoff_retry") def test_sync_unhealthy_boto_connection(self, mock_exponentional_backoff_retry, mock_executor): diff --git a/providers/apache/livy/tests/unit/apache/livy/operators/test_livy.py b/providers/apache/livy/tests/unit/apache/livy/operators/test_livy.py index 2987796dcd3..da69f22cfa3 100644 --- a/providers/apache/livy/tests/unit/apache/livy/operators/test_livy.py +++ b/providers/apache/livy/tests/unit/apache/livy/operators/test_livy.py @@ -554,7 +554,10 @@ class TestLivyOperator: operator.hook.TERMINAL_STATES = [BatchState.SUCCESS] operator.execute(MagicMock()) - assert "OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties." + assert ( + "OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties." + in caplog.text + ) assert operator.spark_params["conf"] == {} diff --git a/providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py b/providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py index 7acffb447d0..c7010a0fb83 100644 --- a/providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py +++ b/providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py @@ -467,7 +467,10 @@ class TestSparkSubmitOperator: ) operator.execute(MagicMock()) - assert "OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties." + assert ( + "OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties." + in caplog.text + ) assert operator.conf == { "parquet.compression": "SNAPPY", } diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py index ad68c5f410d..9408ea04ac9 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py @@ -2528,6 +2528,7 @@ class TestKubernetesPodOperatorAsync: @patch(KUB_OP_PATH.format("extract_xcom")) @patch(HOOK_CLASS) @patch(KUB_OP_PATH.format("pod_manager")) + @pytest.mark.xfail def test_async_write_logs_should_execute_successfully( self, mock_manager, mocked_hook, mock_extract_xcom, post_complete_action, get_logs ): @@ -2548,8 +2549,12 @@ class TestKubernetesPodOperatorAsync: self.run_pod_async(k) if get_logs: - assert f"Container logs: {test_logs}" + # Note: the test below is broken and failing. Either the mock is wrong + # or the mocked container is not in a state that logging methods are called at-all. + # See https://github.com/apache/airflow/issues/57515 + assert f"Container logs: {test_logs}" # noqa: PLW0129 post_complete_action.assert_called_once() + mock_manager.return_value.read_pod_logs.assert_called() else: mock_manager.return_value.read_pod_logs.assert_not_called() diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py index 66fae2524d6..13271854da1 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py @@ -141,14 +141,14 @@ class TestKubernetesPodTrigger: mock_hook.get_pod.return_value = self._mock_pod_result(mock.MagicMock()) mock_method.return_value = ContainerState.WAITING - caplog.set_level(logging.INFO) + caplog.set_level(logging.DEBUG) task = asyncio.create_task(trigger.run().__anext__()) await asyncio.sleep(0.5) assert not task.done() - assert "Container is not completed and still working." - assert f"Sleeping for {POLL_INTERVAL} seconds." + assert "Container is not completed and still working." in caplog.text + assert f"Sleeping for {POLL_INTERVAL} seconds." in caplog.text @pytest.mark.asyncio @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start") @@ -160,14 +160,14 @@ class TestKubernetesPodTrigger: mock_hook.get_pod.return_value = self._mock_pod_result(mock.MagicMock()) mock_method.return_value = ContainerState.RUNNING - caplog.set_level(logging.INFO) + caplog.set_level(logging.DEBUG) task = asyncio.create_task(trigger.run().__anext__()) await asyncio.sleep(0.5) assert not task.done() - assert "Container is not completed and still working." - assert f"Sleeping for {POLL_INTERVAL} seconds." + assert "Container is not completed and still working." in caplog.text + assert f"Sleeping for {POLL_INTERVAL} seconds." in caplog.text @pytest.mark.asyncio @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start") @@ -235,7 +235,7 @@ class TestKubernetesPodTrigger: generator = trigger.run() await generator.asend(None) - assert "Container logs:" + assert "Waiting until 120s to get the POD scheduled..." in caplog.text @pytest.mark.asyncio @pytest.mark.parametrize( diff --git a/providers/google/tests/unit/google/cloud/triggers/test_bigquery_dts.py b/providers/google/tests/unit/google/cloud/triggers/test_bigquery_dts.py index 9ce383c53c9..cba7cb58993 100644 --- a/providers/google/tests/unit/google/cloud/triggers/test_bigquery_dts.py +++ b/providers/google/tests/unit/google/cloud/triggers/test_bigquery_dts.py @@ -153,5 +153,8 @@ class TestBigQueryDataTransferRunTrigger: await asyncio.sleep(0.5) assert not task.done() - assert f"Current job status is: {TransferState.RUNNING}" - assert f"Sleeping for {POLL_INTERVAL} seconds." + assert ( + f"Current state is {TransferState.RUNNING}" in caplog.text + or "Current state is TransferState.RUNNING" in caplog.text + ) + assert f"Waiting for {POLL_INTERVAL} seconds" in caplog.text diff --git a/providers/google/tests/unit/google/cloud/triggers/test_dataflow.py b/providers/google/tests/unit/google/cloud/triggers/test_dataflow.py index 1312a1dfb2a..bf8cfd05d60 100644 --- a/providers/google/tests/unit/google/cloud/triggers/test_dataflow.py +++ b/providers/google/tests/unit/google/cloud/triggers/test_dataflow.py @@ -229,8 +229,8 @@ class TestTemplateJobStartTrigger: await asyncio.sleep(0.5) assert not task.done() - assert f"Current job status is: {JobState.JOB_STATE_RUNNING}" - assert f"Sleeping for {POLL_SLEEP} seconds." + assert "Current job status is: JOB_STATE_RUNNING" in caplog.text + assert f"Sleeping for {POLL_SLEEP} seconds." in caplog.text # cancel the task to suppress test warnings task.cancel() diff --git a/providers/google/tests/unit/google/cloud/triggers/test_dataplex.py b/providers/google/tests/unit/google/cloud/triggers/test_dataplex.py index 27ee12bb436..084d1175f8d 100644 --- a/providers/google/tests/unit/google/cloud/triggers/test_dataplex.py +++ b/providers/google/tests/unit/google/cloud/triggers/test_dataplex.py @@ -132,4 +132,4 @@ class TestDataplexDataQualityJobTrigger: await asyncio.sleep(0.5) assert not task.done() - assert f"Current state is: {DataScanJob.State.RUNNING}, sleeping for {TEST_POLL_INTERVAL} seconds." + assert f"Current state is: RUNNING, sleeping for {TEST_POLL_INTERVAL} seconds." in caplog.text diff --git a/providers/google/tests/unit/google/cloud/triggers/test_kubernetes_engine.py b/providers/google/tests/unit/google/cloud/triggers/test_kubernetes_engine.py index 3704c5c3206..05e194a3f97 100644 --- a/providers/google/tests/unit/google/cloud/triggers/test_kubernetes_engine.py +++ b/providers/google/tests/unit/google/cloud/triggers/test_kubernetes_engine.py @@ -200,14 +200,14 @@ class TestGKEStartPodTrigger: mock_hook.get_pod.return_value = self._mock_pod_result(mock.MagicMock()) mock_method.return_value = ContainerState.WAITING - caplog.set_level(logging.INFO) + caplog.set_level(logging.DEBUG) task = asyncio.create_task(trigger.run().__anext__()) await asyncio.sleep(0.5) assert not task.done() - assert "Container is not completed and still working." - assert f"Sleeping for {POLL_INTERVAL} seconds." + assert "Container is not completed and still working." in caplog.text + assert f"Sleeping for {POLL_INTERVAL} seconds." in caplog.text @pytest.mark.asyncio @mock.patch(f"{TRIGGER_KUB_POD_PATH}._wait_for_pod_start") @@ -219,14 +219,14 @@ class TestGKEStartPodTrigger: mock_hook.get_pod.return_value = self._mock_pod_result(mock.MagicMock()) mock_method.return_value = ContainerState.RUNNING - caplog.set_level(logging.INFO) + caplog.set_level(logging.DEBUG) task = asyncio.create_task(trigger.run().__anext__()) await asyncio.sleep(0.5) assert not task.done() - assert "Container is not completed and still working." - assert f"Sleeping for {POLL_INTERVAL} seconds." + assert "Container is not completed and still working." in caplog.text + assert f"Sleeping for {POLL_INTERVAL} seconds." in caplog.text @pytest.mark.asyncio @mock.patch(f"{TRIGGER_KUB_POD_PATH}._wait_for_pod_start") @@ -265,7 +265,7 @@ class TestGKEStartPodTrigger: generator = trigger.run() await generator.asend(None) - assert "Container logs:" + assert "Waiting until 120s to get the POD scheduled..." in caplog.text @pytest.mark.parametrize( "container_state, expected_state", @@ -447,8 +447,8 @@ class TestGKEOperationTrigger: await asyncio.sleep(0.5) assert not task.done() - assert "Operation is still running." - assert f"Sleeping for {POLL_INTERVAL}s..." + assert "Operation is still running." in caplog.text + assert f"Sleeping for {POLL_INTERVAL}s..." in caplog.text @pytest.mark.asyncio @mock.patch(f"{TRIGGER_PATH}._get_hook") @@ -470,8 +470,8 @@ class TestGKEOperationTrigger: await asyncio.sleep(0.5) assert not task.done() - assert "Operation is still running." - assert f"Sleeping for {POLL_INTERVAL}s..." + assert "Operation is still running." in caplog.text + assert f"Sleeping for {POLL_INTERVAL}s..." in caplog.text class TestGKEStartJobTrigger: diff --git a/providers/standard/tests/unit/standard/operators/test_python.py b/providers/standard/tests/unit/standard/operators/test_python.py index f025d99a852..e9af810fc71 100644 --- a/providers/standard/tests/unit/standard/operators/test_python.py +++ b/providers/standard/tests/unit/standard/operators/test_python.py @@ -955,7 +955,6 @@ class BaseTestPythonVirtualenvOperator(BasePythonTest): def test_string_args(self): def f(): - global virtualenv_string_args print(virtualenv_string_args) if virtualenv_string_args[0] != virtualenv_string_args[2]: raise RuntimeError diff --git a/pyproject.toml b/pyproject.toml index 5fa1e3fbdc6..5f4c6d9bfbf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -589,6 +589,12 @@ extend-select = [ # Warning (PLW) re-implemented in ruff from Pylint "PLW0120", # else clause on loop without a break statement; remove the else and dedent its contents "PLW0127", # Self-assignment of variable + "PLW0128", # Redeclared variable {name} in assignment + "PLW0129", # Asserting on an empty string literal will never pass + "PLW0133", # Missing raise statement on exception + "PLW0245", # super call is missing parentheses + "PLW0406", # Module {name} imports itself + "PLW0602", # Using global for {name} but no assignment is done # Per rule enables "RUF006", # Checks for asyncio dangling task "RUF015", # Checks for unnecessary iterable allocation for first element @@ -621,7 +627,6 @@ extend-select = [ "RET506", # Unnecessary {branch} after raise statement "RET507", # Unnecessary {branch} after continue statement "RET508", # Unnecessary {branch} after break statement - "PLW0133", # Missing raise statement on exception ] ignore = [ "D100", # Unwanted; Docstring at the top of every file. diff --git a/scripts/ci/prek/check_providers_subpackages_all_have_init.py b/scripts/ci/prek/check_providers_subpackages_all_have_init.py index 3a935a963b6..85bf55626bc 100755 --- a/scripts/ci/prek/check_providers_subpackages_all_have_init.py +++ b/scripts/ci/prek/check_providers_subpackages_all_have_init.py @@ -105,7 +105,6 @@ def _determine_init_py_action(need_path_extension: bool, root_path: Path): def check_dir_init_test_folders(folders: list[Path]) -> None: - global should_fail folders = list(folders) for root_distribution_path in folders: # We need init folders for all folders and for the common ones we need path extension @@ -121,7 +120,6 @@ def check_dir_init_test_folders(folders: list[Path]) -> None: def check_dir_init_src_folders(folders: list[Path]) -> None: - global should_fail folders = list(folders) for root_distribution_path in folders: # We need init folders for all folders and for the common ones we need path extension
