kaxil commented on issue #56769:
URL: https://github.com/apache/airflow/issues/56769#issuecomment-3416581902
I looked at the source code of the original package, doesn't do much
complicated stuff for what we need.
cc @jatandewgun Since you were asking that you might be interested to work
on it.
Here is my draft -- feel free to reuse
```diff
diff --git a/airflow-core/pyproject.toml b/airflow-core/pyproject.toml
index b9d1d5c6ab..078c3cc268 100644
--- a/airflow-core/pyproject.toml
+++ b/airflow-core/pyproject.toml
@@ -93,7 +93,6 @@ dependencies = [
"lazy-object-proxy>=1.2.0",
'libcst >=1.8.2',
"linkify-it-py>=2.0.0",
- "lockfile>=0.12.2",
"methodtools>=0.4.7",
"natsort>=8.4.0",
"opentelemetry-api>=1.27.0",
diff --git a/airflow-core/src/airflow/cli/commands/daemon_utils.py
b/airflow-core/src/airflow/cli/commands/daemon_utils.py
index c55c12b380..64039f91cd 100644
--- a/airflow-core/src/airflow/cli/commands/daemon_utils.py
+++ b/airflow-core/src/airflow/cli/commands/daemon_utils.py
@@ -21,9 +21,9 @@ from argparse import Namespace
from collections.abc import Callable
from daemon import daemon
-from daemon.pidfile import TimeoutPIDLockFile
from airflow import settings
+from airflow.utils.pidfile import TimeoutPIDLockFile
from airflow.utils.cli import setup_locations, setup_logging,
sigint_handler, sigquit_handler
from airflow.utils.process_utils import check_if_pidfile_process_is_running
diff --git a/airflow-core/src/airflow/dag_processing/processor.py
b/airflow-core/src/airflow/dag_processing/processor.py
index 252a4280ee..0ebc613b32 100644
--- a/airflow-core/src/airflow/dag_processing/processor.py
+++ b/airflow-core/src/airflow/dag_processing/processor.py
@@ -160,7 +160,7 @@ def _pre_import_airflow_modules(file_path: str, log:
FilteringBoundLogger) -> No
for module in iter_airflow_imports(file_path):
try:
importlib.import_module(module)
- except ModuleNotFoundError as e:
+ except Exception as e:
log.warning("Error when trying to pre-import module '%s' found
in %s: %s", module, file_path, e)
diff --git a/airflow-core/src/airflow/utils/process_utils.py
b/airflow-core/src/airflow/utils/process_utils.py
index f46b90bcbf..0d029fdbd3 100644
--- a/airflow-core/src/airflow/utils/process_utils.py
+++ b/airflow-core/src/airflow/utils/process_utils.py
@@ -41,9 +41,9 @@ from collections.abc import Generator
from contextlib import contextmanager
import psutil
-from lockfile.pidlockfile import PIDLockFile
from airflow.configuration import conf
+from airflow.utils.pidfile import PIDLockFile
from airflow.exceptions import AirflowException
log = logging.getLogger(__name__)
diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py
b/airflow-core/tests/unit/dag_processing/test_processor.py
index f8699cb76a..59c8432a35 100644
--- a/airflow-core/tests/unit/dag_processing/test_processor.py
+++ b/airflow-core/tests/unit/dag_processing/test_processor.py
@@ -402,23 +402,28 @@ class TestDagFileProcessor:
mock_import.assert_called_once_with("airflow.models")
logger.warning.assert_not_called()
- def test__pre_import_airflow_modules_warns_on_missing_module(self):
+ @pytest.mark.parametrize(
+ "exception",
+ [
+ ModuleNotFoundError("module not found"),
+ RuntimeError("import failed"),
+ ImportError("import error"),
+ ],
+ )
+ def test__pre_import_airflow_modules_warns_on_import_errors(self,
exception):
+ """Test that pre-import logs warnings for any import exception
type."""
logger = MagicMock(spec=FilteringBoundLogger)
with (
env_vars({"AIRFLOW__DAG_PROCESSOR__PARSING_PRE_IMPORT_MODULES":
"true"}),
- patch(
- "airflow.dag_processing.processor.iter_airflow_imports",
return_value=["non_existent_module"]
- ),
- patch(
- "airflow.dag_processing.processor.importlib.import_module",
side_effect=ModuleNotFoundError()
- ),
+ patch("airflow.dag_processing.processor.iter_airflow_imports",
return_value=["some_module"]),
+
patch("airflow.dag_processing.processor.importlib.import_module",
side_effect=exception),
):
_pre_import_airflow_modules("test.py", logger)
logger.warning.assert_called_once()
warning_args = logger.warning.call_args[0]
assert "Error when trying to pre-import module" in warning_args[0]
- assert "non_existent_module" in warning_args[1]
+ assert "some_module" in warning_args[1]
assert "test.py" in warning_args[2]
def test__pre_import_airflow_modules_partial_success_and_warning(self):
diff --git
a/providers/celery/src/airflow/providers/celery/cli/celery_command.py
b/providers/celery/src/airflow/providers/celery/cli/celery_command.py
index 5a79d260e8..d1a784a95d 100644
--- a/providers/celery/src/airflow/providers/celery/cli/celery_command.py
+++ b/providers/celery/src/airflow/providers/celery/cli/celery_command.py
@@ -30,9 +30,12 @@ import sqlalchemy.exc
from celery import maybe_patch_concurrency
from celery.app.defaults import DEFAULT_TASK_LOG_FMT
from celery.signals import after_setup_logger
-from lockfile.pidlockfile import read_pid_from_pidfile,
remove_existing_pidfile
from airflow import settings
+try:
+ from lockfile.pidlockfile import read_pid_from_pidfile,
remove_existing_pidfile
+except ImportError:
+ from airflow.utils.pidfile import read_pid_from_pidfile,
remove_existing_pidfile
from airflow.cli.simple_table import AirflowConsole
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]