This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 17e60b0a2b fix: Use prefixes instead of all file paths for OpenLineage
datasets in GCSDeleteObjectsOperator (#39059)
17e60b0a2b is described below
commit 17e60b0a2b640a6974eeecca0765e600817cd097
Author: Kacper Muda <[email protected]>
AuthorDate: Thu Apr 18 17:12:48 2024 +0200
fix: Use prefixes instead of all file paths for OpenLineage datasets in
GCSDeleteObjectsOperator (#39059)
Signed-off-by: Kacper Muda <[email protected]>
---
airflow/providers/google/cloud/operators/gcs.py | 34 +++++++-----
tests/providers/google/cloud/operators/test_gcs.py | 61 +++++++++++++---------
2 files changed, 58 insertions(+), 37 deletions(-)
diff --git a/airflow/providers/google/cloud/operators/gcs.py
b/airflow/providers/google/cloud/operators/gcs.py
index 6c72378a43..f4c8ce932e 100644
--- a/airflow/providers/google/cloud/operators/gcs.py
+++ b/airflow/providers/google/cloud/operators/gcs.py
@@ -297,7 +297,7 @@ class GCSDeleteObjectsOperator(GoogleCloudBaseOperator):
*,
bucket_name: str,
objects: list[str] | None = None,
- prefix: str | None = None,
+ prefix: str | list[str] | None = None,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
@@ -309,12 +309,14 @@ class GCSDeleteObjectsOperator(GoogleCloudBaseOperator):
self.impersonation_chain = impersonation_chain
if objects is None and prefix is None:
- err_message = "(Task {task_id}) Either object or prefix should be
set. Both are None.".format(
+ err_message = "(Task {task_id}) Either objects or prefix should be
set. Both are None.".format(
**kwargs
)
raise ValueError(err_message)
+ if objects is not None and prefix is not None:
+ err_message = "(Task {task_id}) Objects or prefix should be set.
Both provided.".format(**kwargs)
+ raise ValueError(err_message)
- self._objects: list[str] = []
super().__init__(**kwargs)
def execute(self, context: Context) -> None:
@@ -324,15 +326,14 @@ class GCSDeleteObjectsOperator(GoogleCloudBaseOperator):
)
if self.objects is not None:
- self._objects = self.objects
+ objects = self.objects
else:
- self._objects = hook.list(bucket_name=self.bucket_name,
prefix=self.prefix)
- self.log.info("Deleting %s objects from %s", len(self._objects),
self.bucket_name)
- for object_name in self._objects:
+ objects = hook.list(bucket_name=self.bucket_name,
prefix=self.prefix)
+ self.log.info("Deleting %s objects from %s", len(objects),
self.bucket_name)
+ for object_name in objects:
hook.delete(bucket_name=self.bucket_name, object_name=object_name)
- def get_openlineage_facets_on_complete(self, task_instance):
- """Implement on_complete as execute() resolves object names."""
+ def get_openlineage_facets_on_start(self):
from openlineage.client.facet import (
LifecycleStateChange,
LifecycleStateChangeDatasetFacet,
@@ -342,8 +343,17 @@ class GCSDeleteObjectsOperator(GoogleCloudBaseOperator):
from airflow.providers.openlineage.extractors import OperatorLineage
- if not self._objects:
- return OperatorLineage()
+ objects = []
+ if self.objects is not None:
+ objects = self.objects
+ elif self.prefix is not None:
+ prefixes = [self.prefix] if isinstance(self.prefix, str) else
self.prefix
+ for pref in prefixes:
+ # Use parent if not a file (dot not in name) and not a dir
(ends with slash)
+ if "." not in pref.split("/")[-1] and not pref.endswith("/"):
+ pref = Path(pref).parent.as_posix()
+ pref = "/" if pref in (".", "", "/") else pref.rstrip("/")
+ objects.append(pref)
bucket_url = f"gs://{self.bucket_name}"
input_datasets = [
@@ -360,7 +370,7 @@ class GCSDeleteObjectsOperator(GoogleCloudBaseOperator):
)
},
)
- for object_name in self._objects
+ for object_name in objects
]
return OperatorLineage(inputs=input_datasets)
diff --git a/tests/providers/google/cloud/operators/test_gcs.py
b/tests/providers/google/cloud/operators/test_gcs.py
index 6236aa5f23..0024ad6407 100644
--- a/tests/providers/google/cloud/operators/test_gcs.py
+++ b/tests/providers/google/cloud/operators/test_gcs.py
@@ -172,48 +172,59 @@ class TestGCSDeleteObjectsOperator:
any_order=True,
)
- @mock.patch("airflow.providers.google.cloud.operators.gcs.GCSHook")
- def test_get_openlineage_facets_on_complete(self, mock_hook):
+ @pytest.mark.parametrize(
+ ("objects", "prefix", "inputs"),
+ (
+ (["folder/a.txt", "b.json"], None, ["folder/a.txt", "b.json"]),
+ (["folder/a.txt", "folder/b.json"], None, ["folder/a.txt",
"folder/b.json"]),
+ (None, ["folder/a.txt", "b.json"], ["folder/a.txt", "b.json"]),
+ (None, "dir/pre", ["dir"]),
+ (None, ["dir/"], ["dir"]),
+ (None, "", ["/"]),
+ (None, "/", ["/"]),
+ (None, "pre", ["/"]),
+ (None, "dir/pre*", ["dir"]),
+ (None, "*", ["/"]),
+ ),
+ ids=(
+ "objects",
+ "multiple objects in the same dir",
+ "objects as prefixes",
+ "directory with prefix",
+ "directory",
+ "empty prefix",
+ "slash as prefix",
+ "prefix with no ending slash",
+ "directory with prefix with wildcard",
+ "just wildcard",
+ ),
+ )
+ def test_get_openlineage_facets_on_start(self, objects, prefix, inputs):
bucket_url = f"gs://{TEST_BUCKET}"
expected_inputs = [
Dataset(
namespace=bucket_url,
- name="folder/a.txt",
+ name=name,
facets={
"lifecycleStateChange": LifecycleStateChangeDatasetFacet(
lifecycleStateChange=LifecycleStateChange.DROP.value,
previousIdentifier=LifecycleStateChangeDatasetFacetPreviousIdentifier(
namespace=bucket_url,
- name="folder/a.txt",
+ name=name,
),
)
},
- ),
- Dataset(
- namespace=bucket_url,
- name="b.txt",
- facets={
- "lifecycleStateChange": LifecycleStateChangeDatasetFacet(
- lifecycleStateChange=LifecycleStateChange.DROP.value,
-
previousIdentifier=LifecycleStateChangeDatasetFacetPreviousIdentifier(
- namespace=bucket_url,
- name="b.txt",
- ),
- )
- },
- ),
+ )
+ for name in inputs
]
operator = GCSDeleteObjectsOperator(
- task_id=TASK_ID, bucket_name=TEST_BUCKET, objects=["folder/a.txt",
"b.txt"]
+ task_id=TASK_ID, bucket_name=TEST_BUCKET, objects=objects,
prefix=prefix
)
-
- operator.execute(None)
-
- lineage = operator.get_openlineage_facets_on_complete(None)
- assert len(lineage.inputs) == 2
+ lineage = operator.get_openlineage_facets_on_start()
+ assert len(lineage.inputs) == len(inputs)
assert len(lineage.outputs) == 0
- assert lineage.inputs == expected_inputs
+ assert sorted(lineage.inputs) == sorted(expected_inputs)
class TestGoogleCloudStorageListOperator: