This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b97197a9eea Add submodules to GitDagBundle (#59911)
b97197a9eea is described below

commit b97197a9eeaced39de02fa7c51b601afa962711e
Author: Ronaldo Campos <[email protected]>
AuthorDate: Mon Dec 29 22:32:26 2025 +0000

    Add submodules to GitDagBundle (#59911)
---
 providers/git/docs/bundles/index.rst               |   1 +
 .../git/src/airflow/providers/git/bundles/git.py   |  39 ++++++-
 providers/git/tests/unit/git/bundles/test_git.py   | 116 +++++++++++++++++++++
 3 files changed, 155 insertions(+), 1 deletion(-)

diff --git a/providers/git/docs/bundles/index.rst 
b/providers/git/docs/bundles/index.rst
index 30e6ade8989..f9e9efe9a17 100644
--- a/providers/git/docs/bundles/index.rst
+++ b/providers/git/docs/bundles/index.rst
@@ -35,6 +35,7 @@ Example of using the GitDagBundle:
              "subdir": "dags",
              "tracking_ref": "main",
              "refresh_interval": 3600
+             "submodules": False,
              "prune_dotgit_folder": True
          }
      }
diff --git a/providers/git/src/airflow/providers/git/bundles/git.py 
b/providers/git/src/airflow/providers/git/bundles/git.py
index 508efec76ca..b10fc0e7f94 100644
--- a/providers/git/src/airflow/providers/git/bundles/git.py
+++ b/providers/git/src/airflow/providers/git/bundles/git.py
@@ -45,6 +45,7 @@ class GitDagBundle(BaseDagBundle):
     :param subdir: Subdirectory within the repository where the DAGs are 
stored (Optional)
     :param git_conn_id: Connection ID for SSH/token based connection to the 
repository (Optional)
     :param repo_url: Explicit Git repository URL to override the connection's 
host. (Optional)
+    :param submodules: Whether to initialize git submodules. In case of 
submodules, the .git folder is preserved.
     :param prune_dotgit_folder: Remove .git folder from the versions after 
cloning.
 
         The per-version clone is not a full "git" copy (it makes use of git's 
`--local` ability
@@ -62,6 +63,7 @@ class GitDagBundle(BaseDagBundle):
         subdir: str | None = None,
         git_conn_id: str | None = None,
         repo_url: str | None = None,
+        submodules: bool = False,
         prune_dotgit_folder: bool = True,
         **kwargs,
     ) -> None:
@@ -75,7 +77,13 @@ class GitDagBundle(BaseDagBundle):
             self.repo_path = self.base_dir / "tracking_repo"
         self.git_conn_id = git_conn_id
         self.repo_url = repo_url
-        self.prune_dotgit_folder = prune_dotgit_folder
+        self.submodules = submodules
+
+        # Force prune to False if submodules are used, otherwise git links 
break
+        if self.submodules:
+            self.prune_dotgit_folder = False
+        else:
+            self.prune_dotgit_folder = prune_dotgit_folder
 
         self._log = log.bind(
             bundle_name=self.name,
@@ -84,6 +92,7 @@ class GitDagBundle(BaseDagBundle):
             repo_path=self.repo_path,
             versions_path=self.versions_dir,
             git_conn_id=self.git_conn_id,
+            submodules=self.submodules,
         )
 
         self._log.debug("bundle configured")
@@ -124,10 +133,20 @@ class GitDagBundle(BaseDagBundle):
                     self.repo.remotes.origin.fetch()
                 
self.repo.head.set_reference(str(self.repo.commit(self.version)))
                 self.repo.head.reset(index=True, working_tree=True)
+
+                if self.submodules:
+                    cm_sub = self.hook.configure_hook_env() if self.hook else 
nullcontext()
+                    with cm_sub:
+                        try:
+                            self._fetch_submodules()
+                        except GitCommandError as e:
+                            raise RuntimeError("Error pulling submodule from 
repository") from e
+
                 if self.prune_dotgit_folder:
                     shutil.rmtree(self.repo_path / ".git")
             else:
                 self.refresh()
+
             self.repo.close()
 
     def initialize(self) -> None:
@@ -212,6 +231,7 @@ class GitDagBundle(BaseDagBundle):
             f"<GitDagBundle("
             f"name={self.name!r}, "
             f"tracking_ref={self.tracking_ref!r}, "
+            f"submodules={self.submodules!r}, "
             f"subdir={self.subdir!r}, "
             f"version={self.version!r}"
             f")>"
@@ -244,6 +264,16 @@ class GitDagBundle(BaseDagBundle):
             self.bare_repo.remotes.origin.fetch(refspecs)
             self.bare_repo.close()
 
+    @retry(
+        retry=retry_if_exception_type((GitCommandError,)),
+        stop=stop_after_attempt(2),
+        reraise=True,
+    )
+    def _fetch_submodules(self) -> None:
+        self._log.info("Initializing and updating submodules", 
repo_path=self.repo_path)
+        self.repo.git.submodule("sync", "--recursive")
+        self.repo.git.submodule("update", "--init", "--recursive", "--jobs", 
"1")
+
     def refresh(self) -> None:
         if self.version:
             raise AirflowException("Refreshing a specific version is not 
supported")
@@ -261,6 +291,13 @@ class GitDagBundle(BaseDagBundle):
                 else:
                     target = self.tracking_ref
                 self.repo.head.reset(target, index=True, working_tree=True)
+
+                if self.submodules:
+                    try:
+                        self._fetch_submodules()
+                    except GitCommandError as e:
+                        raise RuntimeError("Error pulling submodule from 
repository") from e
+
                 self.repo.close()
 
     @staticmethod
diff --git a/providers/git/tests/unit/git/bundles/test_git.py 
b/providers/git/tests/unit/git/bundles/test_git.py
index 413aa7737c4..ffeb3250bd7 100644
--- a/providers/git/tests/unit/git/bundles/test_git.py
+++ b/providers/git/tests/unit/git/bundles/test_git.py
@@ -860,3 +860,119 @@ class TestGitDagBundle:
 
             # Verify Repo was called twice (failed attempt + failed retry)
             assert mock_repo_class.call_count == 2
+
+    @mock.patch("airflow.providers.git.bundles.git.shutil.rmtree")
+    @mock.patch("airflow.providers.git.bundles.git.os.path.exists")
+    @mock.patch("airflow.providers.git.bundles.git.GitHook")
+    @mock.patch("airflow.providers.git.bundles.git.Repo")
+    def test_initialize_fetches_submodules_when_enabled(
+        self, mock_repo_class, mock_githook, mock_exists, mock_rmtree
+    ):
+        """Test that submodules are synced and updated when submodules=True 
during initialization."""
+        mock_githook.return_value.repo_url = 
"[email protected]:apache/airflow.git"
+
+        # Mock exists to return True so we skip the clone logic and go 
straight to initialization
+        mock_exists.return_value = True
+
+        mock_repo_instance = mock_repo_class.return_value
+        # Ensure _has_version returns True so we don't try to fetch origin
+        mock_repo_instance.commit.return_value = mock.MagicMock()
+
+        bundle = GitDagBundle(
+            name="test",
+            git_conn_id="git_default",
+            tracking_ref="main",
+            version="123456",
+            submodules=True,
+        )
+
+        bundle.initialize()
+
+        # Verify submodule commands were called
+        mock_repo_instance.git.submodule.assert_has_calls(
+            [mock.call("sync", "--recursive"), mock.call("update", "--init", 
"--recursive", "--jobs", "1")]
+        )
+        mock_rmtree.assert_not_called()
+
+    @mock.patch("airflow.providers.git.bundles.git.shutil.rmtree")
+    @mock.patch("airflow.providers.git.bundles.git.os.path.exists")
+    @mock.patch("airflow.providers.git.bundles.git.GitHook")
+    @mock.patch("airflow.providers.git.bundles.git.Repo")
+    def test_refresh_fetches_submodules_when_enabled(
+        self, mock_repo_class, mock_githook, mock_exists, mock_rmtree
+    ):
+        """Test that submodules are synced and updated when submodules=True 
during refresh."""
+        mock_githook.return_value.repo_url = 
"[email protected]:apache/airflow.git"
+        mock_exists.return_value = True
+
+        mock_repo_instance = mock_repo_class.return_value
+
+        bundle = GitDagBundle(
+            name="test",
+            git_conn_id="git_default",
+            tracking_ref="main",
+            submodules=True,
+        )
+
+        # Calling initialize without a specific version triggers refresh()
+        bundle.initialize()
+
+        # Verify submodule commands were called
+        mock_repo_instance.git.submodule.assert_has_calls(
+            [mock.call("sync", "--recursive"), mock.call("update", "--init", 
"--recursive", "--jobs", "1")]
+        )
+        mock_rmtree.assert_not_called()
+
+    @mock.patch("airflow.providers.git.bundles.git.shutil.rmtree")
+    @mock.patch("airflow.providers.git.bundles.git.os.path.exists")
+    @mock.patch("airflow.providers.git.bundles.git.GitHook")
+    @mock.patch("airflow.providers.git.bundles.git.Repo")
+    def test_submodules_disabled_by_default(self, mock_repo_class, 
mock_githook, mock_exists, mock_rmtree):
+        """Test that submodules are NOT fetched by default."""
+        mock_githook.return_value.repo_url = 
"[email protected]:apache/airflow.git"
+        mock_exists.return_value = True
+
+        mock_repo_instance = mock_repo_class.return_value
+
+        bundle = GitDagBundle(
+            name="test",
+            git_conn_id="git_default",
+            tracking_ref="main",
+            version="123456",
+            # submodules defaults to False
+        )
+
+        bundle.initialize()
+
+        # Ensure submodule commands were NOT called
+        mock_repo_instance.git.submodule.assert_not_called()
+
+    @mock.patch("airflow.providers.git.bundles.git.shutil.rmtree")
+    @mock.patch("airflow.providers.git.bundles.git.os.path.exists")
+    @mock.patch("airflow.providers.git.bundles.git.GitHook")
+    @mock.patch("airflow.providers.git.bundles.git.Repo")
+    def test_submodule_fetch_error_raises_runtime_error(
+        self, mock_repo_class, mock_githook, mock_exists, mock_rmtree
+    ):
+        """Test that a GitCommandError during submodule update is raised as a 
RuntimeError."""
+        mock_githook.return_value.repo_url = 
"[email protected]:apache/airflow.git"
+        mock_exists.return_value = True
+
+        mock_repo_instance = mock_repo_class.return_value
+        mock_repo_instance.commit.return_value = mock.MagicMock()
+
+        # Simulate a git error when running submodule update
+        mock_repo_instance.git.submodule.side_effect = 
GitCommandError("submodule update", "failed")
+
+        bundle = GitDagBundle(
+            name="test",
+            git_conn_id="git_default",
+            tracking_ref="main",
+            version="123456",
+            submodules=True,
+        )
+
+        with pytest.raises(RuntimeError, match="Error pulling submodule from 
repository"):
+            bundle.initialize()
+
+        mock_rmtree.assert_not_called()

Reply via email to