This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new aac80984ddc Add support for a subdir in GitDagBundle (#44582)
aac80984ddc is described below

commit aac80984ddc901f3ff48145bdb1aaced620ff26d
Author: Jed Cunningham <[email protected]>
AuthorDate: Mon Dec 2 16:24:14 2024 -0700

    Add support for a subdir in GitDagBundle (#44582)
---
 airflow/dag_processing/bundles/git.py    | 18 ++++++++++++------
 tests/dag_processing/test_dag_bundles.py | 19 +++++++++++++++++++
 2 files changed, 31 insertions(+), 6 deletions(-)

diff --git a/airflow/dag_processing/bundles/git.py 
b/airflow/dag_processing/bundles/git.py
index 21513af2de0..2d6710d3eda 100644
--- a/airflow/dag_processing/bundles/git.py
+++ b/airflow/dag_processing/bundles/git.py
@@ -39,16 +39,21 @@ class GitDagBundle(BaseDagBundle):
 
     :param repo_url: URL of the git repository
     :param head: Branch or tag for this DAG bundle
+    :param subdir: Subdirectory within the repository where the DAGs are 
stored (Optional)
     """
 
     supports_versioning = True
 
-    def __init__(self, *, repo_url: str, head: str, **kwargs) -> None:
+    def __init__(self, *, repo_url: str, head: str, subdir: str | None = None, 
**kwargs) -> None:
         super().__init__(**kwargs)
         self.repo_url = repo_url
         self.head = head
+        self.subdir = subdir
 
         self.bare_repo_path = self._dag_bundle_root_storage_path / "git" / 
self.name
+        self.repo_path = (
+            self._dag_bundle_root_storage_path / "git" / (self.name + 
f"+{self.version or self.head}")
+        )
         self._clone_bare_repo_if_required()
         self._ensure_version_in_bare_repo()
         self._clone_repo_if_required()
@@ -64,12 +69,12 @@ class GitDagBundle(BaseDagBundle):
             self.refresh()
 
     def _clone_repo_if_required(self) -> None:
-        if not os.path.exists(self.path):
+        if not os.path.exists(self.repo_path):
             Repo.clone_from(
                 url=self.bare_repo_path,
-                to_path=self.path,
+                to_path=self.repo_path,
             )
-        self.repo = Repo(self.path)
+        self.repo = Repo(self.repo_path)
 
     def _clone_bare_repo_if_required(self) -> None:
         if not os.path.exists(self.bare_repo_path):
@@ -96,8 +101,9 @@ class GitDagBundle(BaseDagBundle):
 
     @property
     def path(self) -> Path:
-        location = self.version or self.head
-        return self._dag_bundle_root_storage_path / "git" / 
f"{self.name}+{location}"
+        if self.subdir:
+            return self.repo_path / self.subdir
+        return self.repo_path
 
     @staticmethod
     def _has_version(repo: Repo, version: str) -> bool:
diff --git a/tests/dag_processing/test_dag_bundles.py 
b/tests/dag_processing/test_dag_bundles.py
index e66e6fba746..7489067bb82 100644
--- a/tests/dag_processing/test_dag_bundles.py
+++ b/tests/dag_processing/test_dag_bundles.py
@@ -192,3 +192,22 @@ class TestGitDagBundle:
 
         with pytest.raises(AirflowException, match="Version not_found not 
found in the repository"):
             GitDagBundle(name="test", version="not_found", repo_url=repo_path, 
head="master")
+
+    def test_subdir(self, git_repo):
+        repo_path, repo = git_repo
+
+        subdir = "somesubdir"
+        subdir_path = repo_path / subdir
+        subdir_path.mkdir()
+
+        file_path = subdir_path / "some_new_file.py"
+        with open(file_path, "w") as f:
+            f.write("hello world")
+        repo.index.add([file_path])
+        repo.index.commit("Initial commit")
+
+        bundle = GitDagBundle(name="test", repo_url=repo_path, head="master", 
subdir=subdir)
+
+        files_in_repo = {f.name for f in bundle.path.iterdir() if f.is_file()}
+        assert str(bundle.path).endswith(subdir)
+        assert {"some_new_file.py"} == files_in_repo

Reply via email to