thehalleffect commented on issue #56657:
URL: https://github.com/apache/airflow/issues/56657#issuecomment-3519137174

   For whoever is trying to work around this still, I've untangled whats going 
on a bit. Whenever you import ANYTHING that transitively imports the config 
module, it will eagerly load defaults in a way that kind of hoses dagbag 
loading in a pretty opaque fashion. i've come up with this as a (kinda crappy) 
workaround in one of the `conftest.py`s in our project:
   ```python
   @pytest.fixture(scope="session")
   def monkeysession(request):
       """
       session-scoped monkeypatching
       """
       with pytest.MonkeyPatch.context() as mp:
           yield mp
   
   
   ...
   
   
   @pytest.fixture()
   def dagbag_dir(monkeysession):
       """
       very, ***very*** annoyingly, this does nothing, because the config value 
is eagerly
       loaded when importing airflow components *before* this has a chance to 
be monkeypatched.
       leaving this in, because they'll probably fix it at some point, and this 
is a cleaner
       way to point the dag processor at the correct dag dir.
       """
       monkeysession.setenv(
           "AIRFLOW__CORE__DAGS_FOLDER",
           str(path_for(resource="dags")),  # or wherever your dags live
       )
   
   
   @pytest.fixture(scope="session", autouse=True)
   def _force_correct_dag_bundle_loading(monkeysession):
       """
       force the airflow config crap to load things properly.
       any airflow component that touches `airflow.settings` for any reason 
will make that module
       eagerly load a bunch of constants. this means that if you import 
something like airflow.sdk.DAG
       in tests.integration.conftest and monkeypatch the 
`AIRFLOW__CORE__DAGS_FOLDER` env var,
       *it doesn't actually affect how the dags are loaded for testing*.
       
       this, combined with the `force_dag_serialization` util function (or the 
similar
       logic in DAG.test), will work around that.
       """
       monkeysession.setenv(
           "AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST",
           json.dumps(
               [
                   dict(
                       name="dags-folder",
                       
classpath="airflow.dag_processing.bundles.local.LocalDagBundle",
                       kwargs=dict(path=str(path_for(resource="dags"))),  # or 
wherever your dags live
                   )
               ]
           ),
       )
   ```
   
   (`path_for` is just a little utility shim for grabbing the absolute path of 
some project resource relative to the project root)
   
   `DAG.test` should now work.
   
   if you also need to run specific task instances manually in your tests 
without dealing with the boilerplate (and execution time) of invoking `test` on 
the WHOLE dag (which is a thing we do with our project), you can also add this 
hackiness (ripped from `DAG.test` and simplified/massaged a little):
   
   ```python
   from datetime import datetime
   
   from airflow import settings
   from airflow.sdk import DAG, timezone
   from airflow.serialization.serialized_objects import SerializedDAG
   from airflow.utils.types import ArgNotSet, NOTSET
   
   
   def force_dag_serialization(
       *,
       dag: DAG,
       logical_date: datetime | None | ArgNotSet = NOTSET,
       session: settings.Session,
   ) -> SerializedDAG:
       """
       ripped from airflow.sdk.definitions.DAG.test and hacked for our use.
       """
       dag.validate()
   
       logical_date = (
           logical_date if not isinstance(logical_date, ArgNotSet) else 
timezone.utcnow()
       )
   
       SerializedDAG.clear_dags(
           dags=[dag],
           start_date=logical_date,
           end_date=logical_date,
           dag_run_state=False,
       )
   
       from airflow.models.dag_version import DagVersion
   
       version = DagVersion.get_version(dag.dag_id)
       if not version:
           from airflow.dag_processing.bundles.manager import DagBundlesManager
           from airflow.models.dagbag import DagBag, sync_bag_to_db
           from airflow.sdk.definitions._internal.dag_parsing_context import (
               _airflow_parsing_context_manager,
           )
   
           manager = DagBundlesManager()
           manager.sync_bundles_to_db(session=session)
           session.commit()
   
           for bundle in manager.get_all_dag_bundles():
               if bundle.name != "dags-folder":
                   # skip crap we don't care about (example dags)
                   continue
               if not bundle.is_initialized:
                   bundle.initialize()
               with _airflow_parsing_context_manager(dag_id=dag.dag_id):
                   dagbag = DagBag(
                       dag_folder=bundle.path.resolve(),
                       bundle_path=bundle.path.resolve(),
                       include_examples=False,
                   )
                   sync_bag_to_db(dagbag, bundle.name, bundle.version)
               version = DagVersion.get_version(dag.dag_id)
               if version:
                   break
       serialized_dag = 
SerializedDAG.deserialize_dag(SerializedDAG.serialize_dag(dag))
   
       serialized_dag.on_success_callback = dag.on_success_callback  # type: 
ignore[attr-defined, union-attr]
       serialized_dag.on_failure_callback = dag.on_failure_callback  # type: 
ignore[attr-defined, union-attr]
   
       return serialized_dag
   ```
   
   all that said: this was WAY too much digging to get functionality back that 
worked alright before. `DAG.test` definitely needs some regression testing 
(it's just tests all the way down, i guess? 🤪) wrapped around it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to