diogosilva30 commented on PR #65943:
URL: https://github.com/apache/airflow/pull/65943#issuecomment-4386327199
@jscheffl yes, this is intentional. We ship DAG factories and reusable
operators as modules inside the `plugins/` folder so multiple DAGs can be
instantiated from the same logic without duplication.
**Pattern overview:**
```
plugins/
└── common/
└── operators/
└── example_dag_factory.py ← shared factory + tasks
dags/
└── prod/
└── my_dag.py ← thin wrapper that calls the factory
```
**`plugins/common/operators/example_dag_factory.py`** (shared logic):
```python
"""Reusable DAG factory for fetching and exporting metrics."""
from datetime import timedelta
from airflow.sdk import task
@task
def fetch_data(source: str) -> list[dict]:
"""Fetch records from a data source."""
# ... implementation ...
return []
@task
def export_metrics(data: list[dict], conn_id: str) -> None:
"""Export metrics via an external connection."""
# ... implementation ...
def metrics_dag_definition(source: str, conn_id: str) -> None:
"""Wire up the DAG tasks."""
data = fetch_data(source=source)
export_metrics(data=data, conn_id=conn_id)
metrics_dag_kwargs = {
"schedule": timedelta(minutes=5),
"tags": ["metrics"],
}
```
**`dags/prod/my_dag.py`** (thin DAG file):
```python
"""DAG for exporting prod metrics."""
import functools
from common import create_dag
from common.operators.example_dag_factory import metrics_dag_definition,
metrics_dag_kwargs
create_dag(
dag_name="prod_metrics",
dag_definition=functools.partial(
metrics_dag_definition,
source="prod",
conn_id="metrics_conn_prod",
),
dag_file=__file__,
**metrics_dag_kwargs,
)
```
The DAG file itself is essentially a one-liner — all the task logic lives in
the shared plugin module. Because the worker needs to execute the tasks defined
in that module, it must be able to import from `plugins/`, which is why loading
plugins on the worker is required for this pattern to work.
Regarding the `core.execute_tasks_new_python_interpreter` proposal — that
sounds like a clean approach and aligns well with the existing executor
pattern. I'll work on updating the PR to honour that flag rather than
hard-switching.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]