thehalleffect commented on issue #56657:
URL: https://github.com/apache/airflow/issues/56657#issuecomment-3519137174
For whoever is trying to work around this still, I've untangled whats going
on a bit. Whenever you import ANYTHING that transitively imports the config
module, it will eagerly load defaults in a way that kind of hoses dagbag
loading in a pretty opaque fashion. i've come up with this as a (kinda crappy)
workaround in one of the `conftest.py`s in our project:
```python
@pytest.fixture(scope="session")
def monkeysession(request):
"""
session-scoped monkeypatching
"""
with pytest.MonkeyPatch.context() as mp:
yield mp
...
@pytest.fixture()
def dagbag_dir(monkeysession):
"""
very, ***very*** annoyingly, this does nothing, because the config value
is eagerly
loaded when importing airflow components *before* this has a chance to
be monkeypatched.
leaving this in, because they'll probably fix it at some point, and this
is a cleaner
way to point the dag processor at the correct dag dir.
"""
monkeysession.setenv(
"AIRFLOW__CORE__DAGS_FOLDER",
str(path_for(resource="dags")), # or wherever your dags live
)
@pytest.fixture(scope="session", autouse=True)
def _force_correct_dag_bundle_loading(monkeysession):
"""
force the airflow config crap to load things properly.
any airflow component that touches `airflow.settings` for any reason
will make that module
eagerly load a bunch of constants. this means that if you import
something like airflow.sdk.DAG
in tests.integration.conftest and monkeypatch the
`AIRFLOW__CORE__DAGS_FOLDER` env var,
*it doesn't actually affect how the dags are loaded for testing*.
this, combined with the `force_dag_serialization` util function (or the
similar
logic in DAG.test), will work around that.
"""
monkeysession.setenv(
"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST",
json.dumps(
[
dict(
name="dags-folder",
classpath="airflow.dag_processing.bundles.local.LocalDagBundle",
kwargs=dict(path=str(path_for(resource="dags"))), # or
wherever your dags live
)
]
),
)
```
(`path_for` is just a little utility shim for grabbing the absolute path of
some project resource relative to the project root)
`DAG.test` should now work.
if you also need to run specific task instances manually in your tests
without dealing with the boilerplate (and execution time) of invoking `test` on
the WHOLE dag (which is a thing we do with our project), you can also add this
hackiness (ripped from `DAG.test` and simplified/massaged a little):
```python
from datetime import datetime
from airflow import settings
from airflow.sdk import DAG, timezone
from airflow.serialization.serialized_objects import SerializedDAG
from airflow.utils.types import ArgNotSet, NOTSET
def force_dag_serialization(
*,
dag: DAG,
logical_date: datetime | None | ArgNotSet = NOTSET,
session: settings.Session,
) -> SerializedDAG:
"""
ripped from airflow.sdk.definitions.DAG.test and hacked for our use.
"""
dag.validate()
logical_date = (
logical_date if not isinstance(logical_date, ArgNotSet) else
timezone.utcnow()
)
SerializedDAG.clear_dags(
dags=[dag],
start_date=logical_date,
end_date=logical_date,
dag_run_state=False,
)
from airflow.models.dag_version import DagVersion
version = DagVersion.get_version(dag.dag_id)
if not version:
from airflow.dag_processing.bundles.manager import DagBundlesManager
from airflow.models.dagbag import DagBag, sync_bag_to_db
from airflow.sdk.definitions._internal.dag_parsing_context import (
_airflow_parsing_context_manager,
)
manager = DagBundlesManager()
manager.sync_bundles_to_db(session=session)
session.commit()
for bundle in manager.get_all_dag_bundles():
if bundle.name != "dags-folder":
# skip crap we don't care about (example dags)
continue
if not bundle.is_initialized:
bundle.initialize()
with _airflow_parsing_context_manager(dag_id=dag.dag_id):
dagbag = DagBag(
dag_folder=bundle.path.resolve(),
bundle_path=bundle.path.resolve(),
include_examples=False,
)
sync_bag_to_db(dagbag, bundle.name, bundle.version)
version = DagVersion.get_version(dag.dag_id)
if version:
break
serialized_dag =
SerializedDAG.deserialize_dag(SerializedDAG.serialize_dag(dag))
serialized_dag.on_success_callback = dag.on_success_callback # type:
ignore[attr-defined, union-attr]
serialized_dag.on_failure_callback = dag.on_failure_callback # type:
ignore[attr-defined, union-attr]
return serialized_dag
```
all that said: this was WAY too much digging to get functionality back that
worked alright before. `DAG.test` definitely needs some regression testing
(it's just tests all the way down, i guess? 🤪) wrapped around it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]