blag commented on PR #27745:
URL: https://github.com/apache/airflow/pull/27745#issuecomment-1323102173
@jedcunningham is correct. While this works when you use `airflow dags
reserialize` (which is what I was using to exercise this code during
development), it doesn't work with the DAG processors, since those spin off a
process per `.py` file and none of them have a global view of all referenced
datasets in all DAGs across all Python files/modules.
So I don't think this approach is going to work after all without a large
amount of refactoring. 😕 Closing.
<details><summary>Click to expand the code trace</summary>
I've been using `airflow dags reserialize` to reload DAGs. That command is
mapped in `airflow.cli.cli_parser` to
`airflow.cli.commands.dag_command.dag_reserialize`.
`airflow/cli/commands/dag_command.py`:
```python
@provide_session
@cli_utils.action_cli
def dag_reserialize(args, session: Session = NEW_SESSION):
"""Serialize a DAG instance."""
session.query(SerializedDagModel).delete(synchronize_session=False)
if not args.clear_only:
dagbag = DagBag(process_subdir(args.subdir))
...
```
`airflow/models/dagbag.py`:
```python
class DagBag(LoggingMixin):
...
def __init__(
self,
dag_folder: str | pathlib.Path | None = None,
...
collect_dags: bool = True,
):
...
self.dags: dict[str, DAG] = {}
...
if collect_dags:
self.collect_dags(
dag_folder=dag_folder,
include_examples=include_examples,
safe_mode=safe_mode,
)
...
...
def collect_dags(
self,
dag_folder: str | pathlib.Path | None = None,
only_if_updated: bool = True,
include_examples: bool = conf.getboolean("core", "LOAD_EXAMPLES"),
safe_mode: bool = conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE"),
):
...
# Ensure dag_folder is a str -- it may have been a pathlib.Path
dag_folder = correct_maybe_zipped(str(dag_folder))
for filepath in list_py_file_paths(
dag_folder,
safe_mode=safe_mode,
include_examples=include_examples,
):
try:
file_parse_start_dttm = timezone.utcnow()
found_dags = self.process_file(filepath,
only_if_updated=only_if_updated, safe_mode=safe_mode)
...
...
...
...
def process_file(self, filepath, only_if_updated=True, safe_mode=True):
...
if filepath.endswith(".py") or not zipfile.is_zipfile(filepath):
mods = self._load_modules_from_file(filepath, safe_mode)
else:
mods = self._load_modules_from_zip(filepath, safe_mode)
found_dags = self._process_modules(filepath, mods,
file_last_changed_on_disk)
...
...
def _process_modules(self, filepath, mods, file_last_changed_on_disk):
...
for (dag, mod) in top_level_dags:
try:
dag.validate()
self.bag_dag(dag=dag, root_dag=dag)
...
...
...
def bag_dag(self, dag, root_dag):
...
self._bag_dag(dag=dag, root_dag=root_dag, recursive=True)
def _bag_dag(self, *, dag, root_dag, recursive):
...
try:
...
self.dags[dag.dag_id] = dag
...
...
```
So at this point in `airflow dags reserialize` all of the dags have been
read and stuffed into `dagbag.dags`, and the next line writes to the DB.
`airflow/cli/commands/dag_command.py`:
```python
@provide_session
@cli_utils.action_cli
def dag_reserialize(args, session: Session = NEW_SESSION):
"""Serialize a DAG instance."""
session.query(SerializedDagModel).delete(synchronize_session=False)
if not args.clear_only:
...
dagbag.sync_to_db(session=session)
```
`airflow/models/dagbag.py`:
```python
class DagBag(LoggingMixin):
...
@provide_session
def sync_to_db(self, processor_subdir: str | None = None, session:
Session = None):
...
for attempt in run_with_db_retries(logger=self.log):
with attempt:
...
try:
...
DAG.bulk_write_to_db(
self.dags.values(),
processor_subdir=processor_subdir, session=session
)
```
However, the DAG processor only processes files one at a time, not as a
batch.
`airflow/dag_processing/processor.py`:
```python
class DagFileProcessorProcess(LoggingMixin, MultiprocessingStartMethodMixin):
...
def start(self) -> None:
...
process = context.Process(
target=type(self)._run_file_processor,
...
)
...
process.start()
...
@staticmethod
def _run_file_processor(
result_channel: MultiprocessingConnection,
parent_channel: MultiprocessingConnection,
file_path: str,
pickle_dags: bool,
dag_ids: list[str] | None,
thread_name: str,
dag_directory: str,
callback_requests: list[CallbackRequest],
) -> None:
...
def _handle_dag_file_processing():
...
dag_file_processor = DagFileProcessor(dag_ids=dag_ids,
dag_directory=dag_directory, log=log)
result: tuple[int, int] = dag_file_processor.process_file(
file_path=file_path,
pickle_dags=pickle_dags,
callback_requests=callback_requests,
)
...
try:
...
# The following line ensures that stdout goes to the same
destination as the logs. If stdout
# gets sent to logs and logs are sent to stdout, this leads
to an infinite loop. This
# necessitates this conditional based on the value of
DAG_PROCESSOR_LOG_TARGET.
with redirect_stdout(StreamLogWriter(log, logging.INFO)),
redirect_stderr(
StreamLogWriter(log, logging.WARN)
), Stats.timer() as timer:
_handle_dag_file_processing()
...
...
class DagFileProcessor(LoggingMixin):
...
@provide_session
def process_file(
self,
file_path: str,
callback_requests: list[CallbackRequest],
pickle_dags: bool = False,
session: Session = NEW_SESSION,
) -> tuple[int, int]:
...
try:
dagbag = DagBag(file_path, include_examples=False)
...
dagbag.sync_to_db(processor_subdir=self._dag_directory,
session=session)
```
So there isn't a way to figure out which datasets are still in use and which
ones are not.
</details>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]