This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2afd6d1a3ac Bring back support for local dagbag based 
list/list-import-errors (#49380)
2afd6d1a3ac is described below

commit 2afd6d1a3ac471bd01b1c8f66d1bbb5ac9ea92a4
Author: Dheeraj Turaga <[email protected]>
AuthorDate: Sun Apr 20 15:36:02 2025 -0500

    Bring back support for local dagbag based list/list-import-errors (#49380)
    
    * Bring back support for localdagbag based list/list-imporr-errors
    
    * fix typo
    
    * Fix unit tests - restore db back to its orig state for downstream tests
    
    * Absorb Ruff linting to fix CI static checks
    
    * better help message
    
    Co-authored-by: Jens Scheffler <[email protected]>
    
    ---------
    
    Co-authored-by: Jens Scheffler <[email protected]>
---
 airflow-core/src/airflow/cli/cli_config.py         | 11 ++-
 .../src/airflow/cli/commands/dag_command.py        | 96 ++++++++++++++++------
 .../tests/unit/cli/commands/test_dag_command.py    | 56 +++++++++++++
 3 files changed, 134 insertions(+), 29 deletions(-)

diff --git a/airflow-core/src/airflow/cli/cli_config.py 
b/airflow-core/src/airflow/cli/cli_config.py
index 1122033dc94..d094569c3c2 100644
--- a/airflow-core/src/airflow/cli/cli_config.py
+++ b/airflow-core/src/airflow/cli/cli_config.py
@@ -233,6 +233,13 @@ ARG_SKIP_SERVE_LOGS = Arg(
     action="store_true",
 )
 
+# list_dags
+ARG_LIST_LOCAL = Arg(
+    ("-l", "--local"),
+    action="store_true",
+    help="Shows local parsed DAGs and their import errors, ignores content 
serialized in DB",
+)
+
 # list_dag_runs
 ARG_NO_BACKFILL = Arg(
     ("--no-backfill",), help="filter all the backfill dagruns given the dag 
id", action="store_true"
@@ -959,13 +966,13 @@ DAGS_COMMANDS = (
         name="list",
         help="List all the DAGs",
         
func=lazy_load_command("airflow.cli.commands.dag_command.dag_list_dags"),
-        args=(ARG_OUTPUT, ARG_VERBOSE, ARG_DAG_LIST_COLUMNS, ARG_BUNDLE_NAME),
+        args=(ARG_OUTPUT, ARG_VERBOSE, ARG_DAG_LIST_COLUMNS, ARG_BUNDLE_NAME, 
ARG_LIST_LOCAL),
     ),
     ActionCommand(
         name="list-import-errors",
         help="List all the DAGs that have import errors",
         
func=lazy_load_command("airflow.cli.commands.dag_command.dag_list_import_errors"),
-        args=(ARG_BUNDLE_NAME, ARG_OUTPUT, ARG_VERBOSE),
+        args=(ARG_BUNDLE_NAME, ARG_OUTPUT, ARG_VERBOSE, ARG_LIST_LOCAL),
     ),
     ActionCommand(
         name="report",
diff --git a/airflow-core/src/airflow/cli/commands/dag_command.py 
b/airflow-core/src/airflow/cli/commands/dag_command.py
index 7ce8df9d41a..b1151f34091 100644
--- a/airflow-core/src/airflow/cli/commands/dag_command.py
+++ b/airflow-core/src/airflow/cli/commands/dag_command.py
@@ -230,10 +230,10 @@ def _get_dagbag_dag_details(dag: DAG) -> dict:
     return {
         "dag_id": dag.dag_id,
         "dag_display_name": dag.dag_display_name,
-        "bundle_name": dag.get_bundle_name(),
-        "bundle_version": dag.get_bundle_version(),
-        "is_paused": dag.get_is_paused(),
-        "is_stale": dag.get_is_stale(),
+        "bundle_name": dag.get_bundle_name() if hasattr(dag, 
"get_bundle_name") else None,
+        "bundle_version": dag.get_bundle_version() if hasattr(dag, 
"get_bundle_version") else None,
+        "is_paused": dag.get_is_paused() if hasattr(dag, "get_is_paused") else 
None,
+        "is_stale": dag.get_is_stale() if hasattr(dag, "get_is_stale") else 
None,
         "last_parsed_time": None,
         "last_expired": None,
         "relative_fileloc": dag.relative_fileloc,
@@ -343,15 +343,38 @@ def dag_list_dags(args, session: Session = NEW_SESSION) 
-> None:
             file=sys.stderr,
         )
 
-    dagbag = DagBag(read_dags_from_db=True)
-    dagbag.collect_dags_from_db()
+    dagbag_import_errors = 0
+    dags_list = []
+    if args.local:
+        # Get import errors from the local area
+        if args.bundle_name:
+            manager = DagBundlesManager()
+            validate_dag_bundle_arg(args.bundle_name)
+            all_bundles = list(manager.get_all_dag_bundles())
+            bundles_to_search = set(args.bundle_name)
+
+            for bundle in all_bundles:
+                if bundle.name in bundles_to_search:
+                    dagbag = DagBag(bundle.path, bundle_path=bundle.path)
+                    dagbag.collect_dags()
+                    dags_list.extend(list(dagbag.dags.values()))
+                    dagbag_import_errors += len(dagbag.import_errors)
+        else:
+            dagbag = DagBag()
+            dagbag.collect_dags()
+            dags_list.extend(list(dagbag.dags.values()))
+            dagbag_import_errors += len(dagbag.import_errors)
+    else:
+        # Get import errors from the DB
+        dagbag = DagBag(read_dags_from_db=True)
+        dagbag.collect_dags_from_db()
+        dags_list = list(dagbag.dags.values())
 
-    # Get import errors from the DB
-    query = select(func.count()).select_from(ParseImportError)
-    if args.bundle_name:
-        query = query.where(ParseImportError.bundle_name.in_(args.bundle_name))
+        query = select(func.count()).select_from(ParseImportError)
+        if args.bundle_name:
+            query = 
query.where(ParseImportError.bundle_name.in_(args.bundle_name))
 
-    dagbag_import_errors = session.scalar(query)
+        dagbag_import_errors = session.scalar(query)
 
     if dagbag_import_errors > 0:
         from rich import print as rich_print
@@ -382,7 +405,7 @@ def dag_list_dags(args, session: Session = NEW_SESSION) -> 
None:
 
     AirflowConsole().print_as(
         data=sorted(
-            filter_dags_by_bundle(list(dagbag.dags.values()), 
args.bundle_name),
+            filter_dags_by_bundle(dags_list, args.bundle_name if not 
args.local else None),
             key=operator.attrgetter("dag_id"),
         ),
         output=args.output,
@@ -420,22 +443,41 @@ def dag_list_import_errors(args, session: Session = 
NEW_SESSION) -> None:
     """Display dags with import errors on the command line."""
     data = []
 
-    # Get import errors from the DB
-    query = select(ParseImportError)
-    if args.bundle_name:
-        validate_dag_bundle_arg(args.bundle_name)
-        query = query.where(ParseImportError.bundle_name.in_(args.bundle_name))
-
-    dagbag_import_errors = session.scalars(query).all()
+    if args.local:
+        # Get import errors from local areas
+        if args.bundle_name:
+            manager = DagBundlesManager()
+            validate_dag_bundle_arg(args.bundle_name)
+            all_bundles = list(manager.get_all_dag_bundles())
+            bundles_to_search = set(args.bundle_name)
+
+            for bundle in all_bundles:
+                if bundle.name in bundles_to_search:
+                    dagbag = DagBag(bundle.path, bundle_path=bundle.path)
+                    for filename, errors in dagbag.import_errors.items():
+                        data.append({"bundle_name": bundle.name, "filepath": 
filename, "error": errors})
+        else:
+            dagbag = DagBag()
+            for filename, errors in dagbag.import_errors.items():
+                data.append({"filepath": filename, "error": errors})
 
-    for import_error in dagbag_import_errors:
-        data.append(
-            {
-                "bundle_name": import_error.bundle_name,
-                "filepath": import_error.filename,
-                "error": import_error.stacktrace,
-            }
-        )
+    else:
+        # Get import errors from the DB
+        query = select(ParseImportError)
+        if args.bundle_name:
+            validate_dag_bundle_arg(args.bundle_name)
+            query = 
query.where(ParseImportError.bundle_name.in_(args.bundle_name))
+
+        dagbag_import_errors = session.scalars(query).all()
+
+        for import_error in dagbag_import_errors:
+            data.append(
+                {
+                    "bundle_name": import_error.bundle_name,
+                    "filepath": import_error.filename,
+                    "error": import_error.stacktrace,
+                }
+            )
     AirflowConsole().print_as(
         data=data,
         output=args.output,
diff --git a/airflow-core/tests/unit/cli/commands/test_dag_command.py 
b/airflow-core/tests/unit/cli/commands/test_dag_command.py
index 65c87298a31..278248486bd 100644
--- a/airflow-core/tests/unit/cli/commands/test_dag_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_dag_command.py
@@ -256,6 +256,42 @@ class TestCliDags:
             assert key in dag_list[0]
         assert any("airflow/example_dags/example_complex.py" in d["fileloc"] 
for d in dag_list)
 
+    @conf_vars({("core", "load_examples"): "true"})
+    def test_cli_list_local_dags(self):
+        # Clear the database
+        clear_db_dags()
+        args = self.parser.parse_args(["dags", "list", "--output", "json", 
"--local"])
+        with contextlib.redirect_stdout(StringIO()) as temp_stdout:
+            dag_command.dag_list_dags(args)
+            out = temp_stdout.getvalue()
+            dag_list = json.loads(out)
+        for key in ["dag_id", "fileloc", "owners", "is_paused"]:
+            assert key in dag_list[0]
+        assert any("airflow/example_dags/example_complex.py" in d["fileloc"] 
for d in dag_list)
+        # Rebuild Test DB for other tests
+        parse_and_sync_to_db(os.devnull, include_examples=True)
+
+    @conf_vars({("core", "load_examples"): "false"})
+    def test_cli_list_local_dags_with_bundle_name(self, 
configure_testing_dag_bundle):
+        # Clear the database
+        clear_db_dags()
+        path_to_parse = TEST_DAGS_FOLDER / "test_example_bash_operator.py"
+        args = self.parser.parse_args(
+            ["dags", "list", "--output", "json", "--local", "--bundle-name", 
"testing"]
+        )
+        with configure_testing_dag_bundle(path_to_parse):
+            with contextlib.redirect_stdout(StringIO()) as temp_stdout:
+                dag_command.dag_list_dags(args)
+                out = temp_stdout.getvalue()
+                dag_list = json.loads(out)
+            for key in ["dag_id", "fileloc", "owners", "is_paused"]:
+                assert key in dag_list[0]
+            assert any(
+                str(TEST_DAGS_FOLDER / "test_example_bash_operator.py") in 
d["fileloc"] for d in dag_list
+            )
+        # Rebuild Test DB for other tests
+        parse_and_sync_to_db(os.devnull, include_examples=True)
+
     @conf_vars({("core", "load_examples"): "true"})
     def test_cli_list_dags_custom_cols(self):
         args = self.parser.parse_args(
@@ -292,6 +328,26 @@ class TestCliDags:
 
         assert "Failed to load all files." in out
 
+    @conf_vars({("core", "load_examples"): "false"})
+    def test_cli_list_dags_prints_local_import_errors(self, 
configure_testing_dag_bundle, get_test_dag):
+        # Clear the database
+        clear_db_dags()
+        path_to_parse = TEST_DAGS_FOLDER / "test_invalid_cron.py"
+        get_test_dag("test_invalid_cron")
+
+        args = self.parser.parse_args(
+            ["dags", "list", "--output", "yaml", "--bundle-name", "testing", 
"--local"]
+        )
+
+        with configure_testing_dag_bundle(path_to_parse):
+            with contextlib.redirect_stderr(StringIO()) as temp_stderr:
+                dag_command.dag_list_dags(args)
+                out = temp_stderr.getvalue()
+
+        assert "Failed to load all files." in out
+        # Rebuild Test DB for other tests
+        parse_and_sync_to_db(os.devnull, include_examples=True)
+
     @conf_vars({("core", "load_examples"): "true"})
     @mock.patch("airflow.models.DagModel.get_dagmodel")
     def test_list_dags_none_get_dagmodel(self, mock_get_dagmodel):

Reply via email to