This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4dc0883d055 Return files destination uris in
`GoogleDriverToGCSOperator` and `SheetsToGCSOperator` (#61347)
4dc0883d055 is described below
commit 4dc0883d0559afee56e16122c90f0a70d406ddfa
Author: Ajay9704 <[email protected]>
AuthorDate: Thu Feb 26 16:28:53 2026 +0530
Return files destination uris in `GoogleDriverToGCSOperator` and
`SheetsToGCSOperator` (#61347)
* fix to gcs
* 1 try to fix checks
* Update test_sheets_to_gcs.py
* Update test_sheets_to_gcs.py
* Update test_sheets_to_gcs.py
* fixes to feedback
---
.../google/cloud/transfers/gdrive_to_gcs.py | 7 ++-
.../google/cloud/transfers/sheets_to_gcs.py | 22 ++++++-
.../google/cloud/transfers/test_gdrive_to_gcs.py | 4 +-
.../google/cloud/transfers/test_sheets_to_gcs.py | 73 +++++++++++++++++++++-
4 files changed, 100 insertions(+), 6 deletions(-)
diff --git
a/providers/google/src/airflow/providers/google/cloud/transfers/gdrive_to_gcs.py
b/providers/google/src/airflow/providers/google/cloud/transfers/gdrive_to_gcs.py
index ee0f474c420..6cd1808c79c 100644
---
a/providers/google/src/airflow/providers/google/cloud/transfers/gdrive_to_gcs.py
+++
b/providers/google/src/airflow/providers/google/cloud/transfers/gdrive_to_gcs.py
@@ -83,7 +83,7 @@ class GoogleDriveToGCSOperator(BaseOperator):
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
- def execute(self, context: Context):
+ def execute(self, context: Context) -> list[str]:
gdrive_hook = GoogleDriveHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
@@ -100,6 +100,11 @@ class GoogleDriveToGCSOperator(BaseOperator):
) as file:
gdrive_hook.download_file(file_id=file_metadata["id"],
file_handle=file)
+ gcs_uri = f"gs://{self.bucket_name}/{self.object_name}"
+ result = [gcs_uri]
+
+ return result
+
def dry_run(self):
"""Perform a dry run of the operator."""
return None
diff --git
a/providers/google/src/airflow/providers/google/cloud/transfers/sheets_to_gcs.py
b/providers/google/src/airflow/providers/google/cloud/transfers/sheets_to_gcs.py
index 25801a54066..9f8fd02adf2 100644
---
a/providers/google/src/airflow/providers/google/cloud/transfers/sheets_to_gcs.py
+++
b/providers/google/src/airflow/providers/google/cloud/transfers/sheets_to_gcs.py
@@ -53,6 +53,9 @@ class GoogleSheetsToGCSOperator(BaseOperator):
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding
identity, with first
account from the list granting this role to the originating account
(templated).
+ :param return_gcs_uris: If True, returns full GCS URIs (e.g.,
``gs://bucket/path/file.csv``).
+ If False (default), returns object names only (e.g.,
``path/to/file.csv``).
+ Default will change to True in a future release.
"""
template_fields: Sequence[str] = (
@@ -72,6 +75,7 @@ class GoogleSheetsToGCSOperator(BaseOperator):
destination_path: str | None = None,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
+ return_gcs_uris: bool = False,
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -81,6 +85,16 @@ class GoogleSheetsToGCSOperator(BaseOperator):
self.destination_bucket = destination_bucket
self.destination_path = destination_path
self.impersonation_chain = impersonation_chain
+ self.return_gcs_uris = return_gcs_uris
+ if not self.return_gcs_uris:
+ import warnings
+
+ warnings.warn(
+ "The default value of return_gcs_uris will change from False
to True in a future release. "
+ "Please set return_gcs_uris explicitly to avoid this warning.",
+ FutureWarning,
+ stacklevel=2,
+ )
def _upload_data(
self,
@@ -110,7 +124,7 @@ class GoogleSheetsToGCSOperator(BaseOperator):
)
return dest_file_name
- def execute(self, context: Context):
+ def execute(self, context: Context) -> list[str]:
sheet_hook = GSheetsHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
@@ -128,7 +142,11 @@ class GoogleSheetsToGCSOperator(BaseOperator):
for sheet_range in sheet_titles:
data = sheet_hook.get_values(spreadsheet_id=self.spreadsheet_id,
range_=sheet_range)
gcs_path_to_file = self._upload_data(gcs_hook, sheet_hook,
sheet_range, data)
- destination_array.append(gcs_path_to_file)
+ if self.return_gcs_uris:
+ gcs_uri = f"gs://{self.destination_bucket}/{gcs_path_to_file}"
+ destination_array.append(gcs_uri)
+ else:
+ destination_array.append(gcs_path_to_file)
context["ti"].xcom_push(key="destination_objects",
value=destination_array)
return destination_array
diff --git
a/providers/google/tests/unit/google/cloud/transfers/test_gdrive_to_gcs.py
b/providers/google/tests/unit/google/cloud/transfers/test_gdrive_to_gcs.py
index 03890c3ee0b..0c5595917ac 100644
--- a/providers/google/tests/unit/google/cloud/transfers/test_gdrive_to_gcs.py
+++ b/providers/google/tests/unit/google/cloud/transfers/test_gdrive_to_gcs.py
@@ -48,7 +48,7 @@ class TestGoogleDriveToGCSOperator:
meta = {"id": "123xyz"}
mock_gdrive_hook.return_value.get_file_id.return_value = meta
- op.execute(context)
+ result = op.execute(context)
mock_gdrive_hook.return_value.get_file_id.assert_called_once_with(
folder_id=FOLDER_ID, file_name=FILE_NAME, drive_id=DRIVE_ID
)
@@ -61,4 +61,6 @@ class TestGoogleDriveToGCSOperator:
bucket_name=BUCKET, object_name=OBJECT
)
+ # Assert list with GCS URI is returned
+ assert result == [f"gs://{BUCKET}/{OBJECT}"]
assert op.dry_run() is None
diff --git
a/providers/google/tests/unit/google/cloud/transfers/test_sheets_to_gcs.py
b/providers/google/tests/unit/google/cloud/transfers/test_sheets_to_gcs.py
index 5e4446a6c20..2b062ec06e2 100644
--- a/providers/google/tests/unit/google/cloud/transfers/test_sheets_to_gcs.py
+++ b/providers/google/tests/unit/google/cloud/transfers/test_sheets_to_gcs.py
@@ -81,7 +81,12 @@ class TestGoogleSheetsToGCSOperator:
@mock.patch(
"airflow.providers.google.cloud.transfers.sheets_to_gcs.GoogleSheetsToGCSOperator._upload_data"
)
- def test_execute(self, mock_upload_data, mock_sheet_hook, mock_gcs_hook):
+ def test_execute_with_return_gcs_uris_true(
+ self,
+ mock_upload_data,
+ mock_sheet_hook,
+ mock_gcs_hook,
+ ):
mock_ti = mock.MagicMock()
mock_context = {"ti": mock_ti}
data = ["data1", "data2"]
@@ -97,8 +102,9 @@ class TestGoogleSheetsToGCSOperator:
destination_path=PATH,
gcp_conn_id=GCP_CONN_ID,
impersonation_chain=IMPERSONATION_CHAIN,
+ return_gcs_uris=True,
)
- op.execute(mock_context)
+ result = op.execute(mock_context)
mock_sheet_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
@@ -124,4 +130,67 @@ class TestGoogleSheetsToGCSOperator:
actual_call_count = mock_upload_data.call_count
assert len(RANGES) == actual_call_count
+ expected_uris = [f"gs://{BUCKET}/{PATH}", f"gs://{BUCKET}/{PATH}"]
+ mock_ti.xcom_push.assert_called_once_with(key="destination_objects",
value=expected_uris)
+ assert result == expected_uris
+
+
@mock.patch("airflow.providers.google.cloud.transfers.sheets_to_gcs.GCSHook")
+
@mock.patch("airflow.providers.google.cloud.transfers.sheets_to_gcs.GSheetsHook")
+ @mock.patch(
+
"airflow.providers.google.cloud.transfers.sheets_to_gcs.GoogleSheetsToGCSOperator._upload_data"
+ )
+ def test_execute_with_return_gcs_uris_false(
+ self,
+ mock_upload_data,
+ mock_sheet_hook,
+ mock_gcs_hook,
+ ):
+ mock_ti = mock.MagicMock()
+ mock_context = {"ti": mock_ti}
+ data = ["data1", "data2"]
+ mock_sheet_hook.return_value.get_sheet_titles.return_value = RANGES
+ mock_sheet_hook.return_value.get_values.side_effect = data
+ mock_upload_data.side_effect = [PATH, PATH]
+ op = GoogleSheetsToGCSOperator(
+ task_id="test_task",
+ spreadsheet_id=SPREADSHEET_ID,
+ destination_bucket=BUCKET,
+ sheet_filter=FILTER,
+ destination_path=PATH,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ return_gcs_uris=False,
+ )
+ result = op.execute(mock_context)
mock_ti.xcom_push.assert_called_once_with(key="destination_objects",
value=[PATH, PATH])
+ assert result == [PATH, PATH]
+
+
@mock.patch("airflow.providers.google.cloud.transfers.sheets_to_gcs.GCSHook")
+
@mock.patch("airflow.providers.google.cloud.transfers.sheets_to_gcs.GSheetsHook")
+ @mock.patch(
+
"airflow.providers.google.cloud.transfers.sheets_to_gcs.GoogleSheetsToGCSOperator._upload_data"
+ )
+ def test_execute_with_return_gcs_uris_default(
+ self,
+ mock_upload_data,
+ mock_sheet_hook,
+ mock_gcs_hook,
+ ):
+ mock_ti = mock.MagicMock()
+ mock_context = {"ti": mock_ti}
+ data = ["data1"]
+ mock_sheet_hook.return_value.get_sheet_titles.return_value =
["single_range"]
+ mock_sheet_hook.return_value.get_values.side_effect = data
+ mock_upload_data.side_effect = [PATH]
+ op = GoogleSheetsToGCSOperator(
+ task_id="test_task",
+ spreadsheet_id=SPREADSHEET_ID,
+ destination_bucket=BUCKET,
+ sheet_filter=FILTER,
+ destination_path=PATH,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ result = op.execute(mock_context)
+ mock_ti.xcom_push.assert_called_once_with(key="destination_objects",
value=[PATH])
+ assert result == [PATH]