This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 927e3643c2 fix: Use prefixes instead of full file paths for
OpenLineage datasets in GCSToGCSOperator (#39058)
927e3643c2 is described below
commit 927e3643c2f901a3ac85f8dc94541ba83b3c6755
Author: Kacper Muda <[email protected]>
AuthorDate: Thu Apr 18 16:11:14 2024 +0200
fix: Use prefixes instead of full file paths for OpenLineage datasets in
GCSToGCSOperator (#39058)
Signed-off-by: Kacper Muda <[email protected]>
---
.../google/cloud/transfers/gcs_to_bigquery.py | 8 +-
.../providers/google/cloud/transfers/gcs_to_gcs.py | 34 ++--
.../google/cloud/transfers/test_gcs_to_gcs.py | 207 ++++++++++++++-------
3 files changed, 165 insertions(+), 84 deletions(-)
diff --git a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
index 03aefcb8ad..3899048dc4 100644
--- a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
+++ b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
@@ -749,7 +749,6 @@ class GCSToBigQueryOperator(BaseOperator):
)
from openlineage.client.run import Dataset
- from airflow.providers.google.cloud.hooks.gcs import _parse_gcs_url
from airflow.providers.google.cloud.utils.openlineage import (
get_facets_from_bq_table,
get_identity_column_lineage_facet,
@@ -766,8 +765,7 @@ class GCSToBigQueryOperator(BaseOperator):
"schema": output_dataset_facets["schema"],
}
input_datasets = []
- for uri in sorted(self.source_uris):
- bucket, blob = _parse_gcs_url(uri)
+ for blob in sorted(self.source_objects):
additional_facets = {}
if "*" in blob:
@@ -777,7 +775,7 @@ class GCSToBigQueryOperator(BaseOperator):
"symlink": SymlinksDatasetFacet(
identifiers=[
SymlinksDatasetFacetIdentifiers(
- namespace=f"gs://{bucket}", name=blob,
type="file"
+ namespace=f"gs://{self.bucket}", name=blob,
type="file"
)
]
),
@@ -788,7 +786,7 @@ class GCSToBigQueryOperator(BaseOperator):
blob = "/"
dataset = Dataset(
- namespace=f"gs://{bucket}",
+ namespace=f"gs://{self.bucket}",
name=blob,
facets=merge_dicts(input_dataset_facets, additional_facets),
)
diff --git a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
index 8d07dca58b..0b3d330b65 100644
--- a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
@@ -234,8 +234,6 @@ class GCSToGCSOperator(BaseOperator):
self.source_object_required = source_object_required
self.exact_match = exact_match
self.match_glob = match_glob
- self.resolved_source_objects: set[str] = set()
- self.resolved_target_objects: set[str] = set()
def execute(self, context: Context):
hook = GCSHook(
@@ -540,13 +538,6 @@ class GCSToGCSOperator(BaseOperator):
self.destination_bucket,
destination_object,
)
-
- self.resolved_source_objects.add(source_object)
- if not destination_object:
- self.resolved_target_objects.add(source_object)
- else:
- self.resolved_target_objects.add(destination_object)
-
hook.rewrite(self.source_bucket, source_object,
self.destination_bucket, destination_object)
if self.move_object:
@@ -559,17 +550,36 @@ class GCSToGCSOperator(BaseOperator):
This means we won't have to normalize self.source_object and
self.source_objects,
destination bucket and so on.
"""
+ from pathlib import Path
+
from openlineage.client.run import Dataset
from airflow.providers.openlineage.extractors import OperatorLineage
+ def _process_prefix(pref):
+ if WILDCARD in pref:
+ pref = pref.split(WILDCARD)[0]
+ # Use parent if not a file (dot not in name) and not a dir (ends
with slash)
+ if "." not in pref.split("/")[-1] and not pref.endswith("/"):
+ pref = Path(pref).parent.as_posix()
+ return ["/" if pref in ("", "/", ".") else pref.rstrip("/")] #
Adjust root path
+
+ inputs = []
+ for prefix in self.source_objects:
+ result = _process_prefix(prefix)
+ inputs.extend(result)
+
+ if self.destination_object is None:
+ outputs = inputs.copy()
+ else:
+ outputs = _process_prefix(self.destination_object)
+
return OperatorLineage(
inputs=[
- Dataset(namespace=f"gs://{self.source_bucket}", name=source)
- for source in sorted(self.resolved_source_objects)
+ Dataset(namespace=f"gs://{self.source_bucket}", name=source)
for source in sorted(set(inputs))
],
outputs=[
Dataset(namespace=f"gs://{self.destination_bucket}",
name=target)
- for target in sorted(self.resolved_target_objects)
+ for target in sorted(set(outputs))
],
)
diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
index f961038f5d..97e16e7366 100644
--- a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
+++ b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
@@ -23,7 +23,7 @@ from unittest import mock
import pytest
from openlineage.client.run import Dataset
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning
from airflow.providers.google.cloud.transfers.gcs_to_gcs import WILDCARD,
GCSToGCSOperator
TASK_ID = "test-gcs-to-gcs-operator"
@@ -829,74 +829,147 @@ class TestGoogleCloudStorageToCloudStorageOperator:
]
mock_hook.return_value.rewrite.assert_has_calls(mock_calls)
+ @pytest.mark.parametrize(
+ ("source_objects", "destination_object", "inputs", "outputs"),
+ (
+ (
+ SOURCE_OBJECTS_SINGLE_FILE,
+ None,
+ [Dataset(namespace=f"gs://{TEST_BUCKET}",
name=SOURCE_OBJECTS_SINGLE_FILE[0])],
+ [Dataset(namespace=f"gs://{DESTINATION_BUCKET}",
name=SOURCE_OBJECTS_SINGLE_FILE[0])],
+ ),
+ (
+ SOURCE_OBJECTS_SINGLE_FILE,
+ "target.txt",
+ [Dataset(namespace=f"gs://{TEST_BUCKET}",
name=SOURCE_OBJECTS_SINGLE_FILE[0])],
+ [Dataset(namespace=f"gs://{DESTINATION_BUCKET}",
name="target.txt")],
+ ),
+ (
+ SOURCE_OBJECTS_SINGLE_FILE,
+ "target_pre",
+ [Dataset(namespace=f"gs://{TEST_BUCKET}",
name=SOURCE_OBJECTS_SINGLE_FILE[0])],
+ [Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name="/")],
+ ),
+ (
+ SOURCE_OBJECTS_SINGLE_FILE,
+ "dir/",
+ [Dataset(namespace=f"gs://{TEST_BUCKET}",
name=SOURCE_OBJECTS_SINGLE_FILE[0])],
+ [Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name="dir")],
+ ),
+ (
+ SOURCE_OBJECTS_LIST,
+ "",
+ [
+ Dataset(namespace=f"gs://{TEST_BUCKET}",
name=SOURCE_OBJECTS_LIST[0]),
+ Dataset(namespace=f"gs://{TEST_BUCKET}",
name=SOURCE_OBJECTS_LIST[1]),
+ Dataset(namespace=f"gs://{TEST_BUCKET}",
name=SOURCE_OBJECTS_LIST[2]),
+ ],
+ [
+ Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name="/"),
+ ],
+ ),
+ (
+ [*SOURCE_OBJECTS_LIST, "dir/*"],
+ "parent/pre_",
+ [
+ Dataset(namespace=f"gs://{TEST_BUCKET}",
name=SOURCE_OBJECTS_LIST[0]),
+ Dataset(namespace=f"gs://{TEST_BUCKET}",
name=SOURCE_OBJECTS_LIST[1]),
+ Dataset(namespace=f"gs://{TEST_BUCKET}",
name=SOURCE_OBJECTS_LIST[2]),
+ Dataset(namespace=f"gs://{TEST_BUCKET}", name="dir"),
+ ],
+ [Dataset(namespace=f"gs://{DESTINATION_BUCKET}",
name="parent")],
+ ),
+ (
+ SOURCE_OBJECTS_NO_FILE,
+ "no_ending_slash",
+ [Dataset(namespace=f"gs://{TEST_BUCKET}", name="/")],
+ [Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name="/")],
+ ),
+ (
+ [
+ f"dir/{SOURCE_OBJECT_WILDCARD_PREFIX}",
+ f"dir/{SOURCE_OBJECT_WILDCARD_SUFFIX}",
+ f"dir/{SOURCE_OBJECT_WILDCARD_MIDDLE}",
+ f"dir/{SOURCE_OBJECT_WILDCARD_FILENAME}",
+ "dir/*",
+ "dir/",
+ "dir/pre_",
+ ],
+ "/",
+ [
+ Dataset(namespace=f"gs://{TEST_BUCKET}", name="dir"),
+ ],
+ [
+ Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name="/"),
+ ],
+ ),
+ (
+ ["", "dir/pre", SOURCE_OBJECTS_SINGLE_FILE[0]],
+ DESTINATION_OBJECT,
+ [
+ Dataset(namespace=f"gs://{TEST_BUCKET}", name="/"),
+ Dataset(namespace=f"gs://{TEST_BUCKET}", name="dir"),
+ Dataset(namespace=f"gs://{TEST_BUCKET}",
name=SOURCE_OBJECTS_SINGLE_FILE[0]),
+ ],
+ [
+ Dataset(namespace=f"gs://{DESTINATION_BUCKET}",
name=DESTINATION_OBJECT_PREFIX),
+ ],
+ ),
+ (
+ [
+ "",
+ "dir/",
+ ],
+ None,
+ [
+ Dataset(namespace=f"gs://{TEST_BUCKET}", name="/"),
+ Dataset(namespace=f"gs://{TEST_BUCKET}", name="dir"),
+ ],
+ [
+ Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name="/"),
+ Dataset(namespace=f"gs://{DESTINATION_BUCKET}",
name="dir"),
+ ],
+ ),
+ ),
+ ids=(
+ "single file without output",
+ "single file with single file output",
+ "single file with prefix output",
+ "single file with dir output",
+ "multiple file with empty output",
+ "multiple file with prefix as output",
+ "empty prefix with prefix as output",
+ "directory + prefix or wildcard without output",
+ "mixed prefixes and file paths with output dir",
+ "empty prefix + directory without output",
+ ),
+ )
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
- def test_execute_simple_reports_openlineage(self, mock_hook):
- operator = GCSToGCSOperator(
- task_id=TASK_ID,
- source_bucket=TEST_BUCKET,
- source_object=SOURCE_OBJECTS_SINGLE_FILE[0],
- destination_bucket=DESTINATION_BUCKET,
- )
-
- operator.execute(None)
-
- lineage = operator.get_openlineage_facets_on_complete(None)
- assert len(lineage.inputs) == 1
- assert len(lineage.outputs) == 1
- assert lineage.inputs[0] == Dataset(
- namespace=f"gs://{TEST_BUCKET}", name=SOURCE_OBJECTS_SINGLE_FILE[0]
- )
- assert lineage.outputs[0] == Dataset(
- namespace=f"gs://{DESTINATION_BUCKET}",
name=SOURCE_OBJECTS_SINGLE_FILE[0]
- )
-
- @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
- def test_execute_multiple_reports_openlineage(self, mock_hook):
- operator = GCSToGCSOperator(
- task_id=TASK_ID,
- source_bucket=TEST_BUCKET,
- source_objects=SOURCE_OBJECTS_LIST,
- destination_bucket=DESTINATION_BUCKET,
- destination_object=DESTINATION_OBJECT,
- )
-
- operator.execute(None)
-
- lineage = operator.get_openlineage_facets_on_complete(None)
- assert len(lineage.inputs) == 3
- assert len(lineage.outputs) == 1
- assert lineage.inputs == [
- Dataset(namespace=f"gs://{TEST_BUCKET}",
name=SOURCE_OBJECTS_LIST[0]),
- Dataset(namespace=f"gs://{TEST_BUCKET}",
name=SOURCE_OBJECTS_LIST[1]),
- Dataset(namespace=f"gs://{TEST_BUCKET}",
name=SOURCE_OBJECTS_LIST[2]),
- ]
- assert lineage.outputs[0] ==
Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name=DESTINATION_OBJECT)
-
- @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
- def test_execute_wildcard_reports_openlineage(self, mock_hook):
- mock_hook.return_value.list.return_value = [
- "test_object1.txt",
- "test_object2.txt",
- ]
-
- operator = GCSToGCSOperator(
- task_id=TASK_ID,
- source_bucket=TEST_BUCKET,
- source_object=SOURCE_OBJECT_WILDCARD_SUFFIX,
- destination_bucket=DESTINATION_BUCKET,
- destination_object=DESTINATION_OBJECT,
- )
+ def test_get_openlineage_facets_on_complete(
+ self, mock_hook, source_objects, destination_object, inputs, outputs
+ ):
+ if source_objects and any(WILDCARD in obj for obj in source_objects):
+ with pytest.warns(AirflowProviderDeprecationWarning, match="Usage
of wildcard"):
+ operator = GCSToGCSOperator(
+ task_id=TASK_ID,
+ source_bucket=TEST_BUCKET,
+ source_objects=source_objects,
+ destination_bucket=DESTINATION_BUCKET,
+ destination_object=destination_object,
+ )
+ else:
+ operator = GCSToGCSOperator(
+ task_id=TASK_ID,
+ source_bucket=TEST_BUCKET,
+ source_objects=source_objects,
+ destination_bucket=DESTINATION_BUCKET,
+ destination_object=destination_object,
+ )
operator.execute(None)
lineage = operator.get_openlineage_facets_on_complete(None)
- assert len(lineage.inputs) == 2
- assert len(lineage.outputs) == 2
- assert lineage.inputs == [
- Dataset(namespace=f"gs://{TEST_BUCKET}", name="test_object1.txt"),
- Dataset(namespace=f"gs://{TEST_BUCKET}", name="test_object2.txt"),
- ]
- assert lineage.outputs == [
- Dataset(namespace=f"gs://{DESTINATION_BUCKET}",
name="foo/bar/1.txt"),
- Dataset(namespace=f"gs://{DESTINATION_BUCKET}",
name="foo/bar/2.txt"),
- ]
+ assert len(lineage.inputs) == len(inputs)
+ assert len(lineage.outputs) == len(outputs)
+ assert sorted(lineage.inputs) == sorted(inputs)
+ assert sorted(lineage.outputs) == sorted(outputs)