This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new aac80984ddc Add support for a subdir in GitDagBundle (#44582)
aac80984ddc is described below
commit aac80984ddc901f3ff48145bdb1aaced620ff26d
Author: Jed Cunningham <[email protected]>
AuthorDate: Mon Dec 2 16:24:14 2024 -0700
Add support for a subdir in GitDagBundle (#44582)
---
airflow/dag_processing/bundles/git.py | 18 ++++++++++++------
tests/dag_processing/test_dag_bundles.py | 19 +++++++++++++++++++
2 files changed, 31 insertions(+), 6 deletions(-)
diff --git a/airflow/dag_processing/bundles/git.py
b/airflow/dag_processing/bundles/git.py
index 21513af2de0..2d6710d3eda 100644
--- a/airflow/dag_processing/bundles/git.py
+++ b/airflow/dag_processing/bundles/git.py
@@ -39,16 +39,21 @@ class GitDagBundle(BaseDagBundle):
:param repo_url: URL of the git repository
:param head: Branch or tag for this DAG bundle
+ :param subdir: Subdirectory within the repository where the DAGs are
stored (Optional)
"""
supports_versioning = True
- def __init__(self, *, repo_url: str, head: str, **kwargs) -> None:
+ def __init__(self, *, repo_url: str, head: str, subdir: str | None = None,
**kwargs) -> None:
super().__init__(**kwargs)
self.repo_url = repo_url
self.head = head
+ self.subdir = subdir
self.bare_repo_path = self._dag_bundle_root_storage_path / "git" /
self.name
+ self.repo_path = (
+ self._dag_bundle_root_storage_path / "git" / (self.name +
f"+{self.version or self.head}")
+ )
self._clone_bare_repo_if_required()
self._ensure_version_in_bare_repo()
self._clone_repo_if_required()
@@ -64,12 +69,12 @@ class GitDagBundle(BaseDagBundle):
self.refresh()
def _clone_repo_if_required(self) -> None:
- if not os.path.exists(self.path):
+ if not os.path.exists(self.repo_path):
Repo.clone_from(
url=self.bare_repo_path,
- to_path=self.path,
+ to_path=self.repo_path,
)
- self.repo = Repo(self.path)
+ self.repo = Repo(self.repo_path)
def _clone_bare_repo_if_required(self) -> None:
if not os.path.exists(self.bare_repo_path):
@@ -96,8 +101,9 @@ class GitDagBundle(BaseDagBundle):
@property
def path(self) -> Path:
- location = self.version or self.head
- return self._dag_bundle_root_storage_path / "git" /
f"{self.name}+{location}"
+ if self.subdir:
+ return self.repo_path / self.subdir
+ return self.repo_path
@staticmethod
def _has_version(repo: Repo, version: str) -> bool:
diff --git a/tests/dag_processing/test_dag_bundles.py
b/tests/dag_processing/test_dag_bundles.py
index e66e6fba746..7489067bb82 100644
--- a/tests/dag_processing/test_dag_bundles.py
+++ b/tests/dag_processing/test_dag_bundles.py
@@ -192,3 +192,22 @@ class TestGitDagBundle:
with pytest.raises(AirflowException, match="Version not_found not
found in the repository"):
GitDagBundle(name="test", version="not_found", repo_url=repo_path,
head="master")
+
+ def test_subdir(self, git_repo):
+ repo_path, repo = git_repo
+
+ subdir = "somesubdir"
+ subdir_path = repo_path / subdir
+ subdir_path.mkdir()
+
+ file_path = subdir_path / "some_new_file.py"
+ with open(file_path, "w") as f:
+ f.write("hello world")
+ repo.index.add([file_path])
+ repo.index.commit("Initial commit")
+
+ bundle = GitDagBundle(name="test", repo_url=repo_path, head="master",
subdir=subdir)
+
+ files_in_repo = {f.name for f in bundle.path.iterdir() if f.is_file()}
+ assert str(bundle.path).endswith(subdir)
+ assert {"some_new_file.py"} == files_in_repo