This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 606ef453a1c Provide option to `force_delete` for
`GCSToBigQueryOperator` (#43785)
606ef453a1c is described below
commit 606ef453a1cb3f2c54f90fe756d9044b8edf0121
Author: Nathan Hadfield <[email protected]>
AuthorDate: Mon Nov 11 12:49:52 2024 +0000
Provide option to `force_delete` for `GCSToBigQueryOperator` (#43785)
* Adding a parameter to provide the option to force delete the destination
table if it already exists.
* Adding a test for force_delete
* Update
providers/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
Co-authored-by: Shahar Epstein <[email protected]>
---------
Co-authored-by: Shahar Epstein <[email protected]>
---
.../google/cloud/transfers/gcs_to_bigquery.py | 9 +++++++-
.../google/cloud/transfers/test_gcs_to_bigquery.py | 25 ++++++++++++++++++++++
2 files changed, 33 insertions(+), 1 deletion(-)
diff --git
a/providers/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
b/providers/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
index 3fe4ccf2310..06b9a94171b 100644
--- a/providers/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
+++ b/providers/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
@@ -174,6 +174,7 @@ class GCSToBigQueryOperator(BaseOperator):
destination table is newly created. If the table already exists and a
value different than the
current description is provided, the job will fail.
:param deferrable: Run operator in the deferrable mode
+ :param force_delete: Force the destination table to be deleted if it
already exists.
"""
template_fields: Sequence[str] = (
@@ -231,6 +232,7 @@ class GCSToBigQueryOperator(BaseOperator):
force_rerun: bool = True,
reattach_states: set[str] | None = None,
project_id: str = PROVIDE_PROJECT_ID,
+ force_delete: bool = False,
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -296,6 +298,7 @@ class GCSToBigQueryOperator(BaseOperator):
self.force_rerun = force_rerun
self.reattach_states: set[str] = reattach_states or set()
self.cancel_on_kill = cancel_on_kill
+ self.force_delete = force_delete
self.source_uris: list[str] = []
@@ -378,7 +381,11 @@ class GCSToBigQueryOperator(BaseOperator):
max_id = self._find_max_value_in_column()
return max_id
else:
- self.log.info("Using existing BigQuery table for storing data...")
+ if self.force_delete:
+ self.log.info("Deleting table %s",
self.destination_project_dataset_table)
+
hook.delete_table(table_id=self.destination_project_dataset_table)
+ else:
+ self.log.info("Using existing BigQuery table for storing
data...")
self.configuration = self._use_existing_table()
try:
diff --git a/providers/tests/google/cloud/transfers/test_gcs_to_bigquery.py
b/providers/tests/google/cloud/transfers/test_gcs_to_bigquery.py
index 046e9a711bb..1097faa7491 100644
--- a/providers/tests/google/cloud/transfers/test_gcs_to_bigquery.py
+++ b/providers/tests/google/cloud/transfers/test_gcs_to_bigquery.py
@@ -1946,3 +1946,28 @@ class TestAsyncGCSToBigQueryOperator:
"task_instance": task_instance,
"logical_date": logical_date,
}
+
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+ def test_force_delete_should_execute_successfully(self, hook):
+ hook.return_value.insert_job.side_effect = [
+ MagicMock(job_id=REAL_JOB_ID, error_result=False),
+ REAL_JOB_ID,
+ ]
+ hook.return_value.generate_job_id.return_value = REAL_JOB_ID
+ hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
+ hook.return_value.get_job.return_value.result.return_value = ("1",)
+
+ operator = GCSToBigQueryOperator(
+ task_id=TASK_ID,
+ bucket=TEST_BUCKET,
+ source_objects=TEST_SOURCE_OBJECTS,
+ destination_project_dataset_table=TEST_EXPLICIT_DEST,
+ write_disposition=WRITE_DISPOSITION,
+ schema_fields=SCHEMA_FIELDS_INT,
+ autodetect=True,
+ project_id=JOB_PROJECT_ID,
+ force_delete=True,
+ )
+
+ operator.execute(context=MagicMock())
+
hook.return_value.delete_table.assert_called_once_with(table_id=TEST_EXPLICIT_DEST)