This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 606ef453a1c Provide option to `force_delete` for 
`GCSToBigQueryOperator` (#43785)
606ef453a1c is described below

commit 606ef453a1cb3f2c54f90fe756d9044b8edf0121
Author: Nathan Hadfield <[email protected]>
AuthorDate: Mon Nov 11 12:49:52 2024 +0000

    Provide option to `force_delete` for `GCSToBigQueryOperator` (#43785)
    
    * Adding a parameter to provide the option to force delete the destination 
table if it already exists.
    
    * Adding a test for force_delete
    
    * Update 
providers/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
    
    Co-authored-by: Shahar Epstein <[email protected]>
    
    ---------
    
    Co-authored-by: Shahar Epstein <[email protected]>
---
 .../google/cloud/transfers/gcs_to_bigquery.py      |  9 +++++++-
 .../google/cloud/transfers/test_gcs_to_bigquery.py | 25 ++++++++++++++++++++++
 2 files changed, 33 insertions(+), 1 deletion(-)

diff --git 
a/providers/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py 
b/providers/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
index 3fe4ccf2310..06b9a94171b 100644
--- a/providers/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
+++ b/providers/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
@@ -174,6 +174,7 @@ class GCSToBigQueryOperator(BaseOperator):
         destination table is newly created. If the table already exists and a 
value different than the
         current description is provided, the job will fail.
     :param deferrable: Run operator in the deferrable mode
+    :param force_delete: Force the destination table to be deleted if it 
already exists.
     """
 
     template_fields: Sequence[str] = (
@@ -231,6 +232,7 @@ class GCSToBigQueryOperator(BaseOperator):
         force_rerun: bool = True,
         reattach_states: set[str] | None = None,
         project_id: str = PROVIDE_PROJECT_ID,
+        force_delete: bool = False,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
@@ -296,6 +298,7 @@ class GCSToBigQueryOperator(BaseOperator):
         self.force_rerun = force_rerun
         self.reattach_states: set[str] = reattach_states or set()
         self.cancel_on_kill = cancel_on_kill
+        self.force_delete = force_delete
 
         self.source_uris: list[str] = []
 
@@ -378,7 +381,11 @@ class GCSToBigQueryOperator(BaseOperator):
                 max_id = self._find_max_value_in_column()
                 return max_id
         else:
-            self.log.info("Using existing BigQuery table for storing data...")
+            if self.force_delete:
+                self.log.info("Deleting table %s", 
self.destination_project_dataset_table)
+                
hook.delete_table(table_id=self.destination_project_dataset_table)
+            else:
+                self.log.info("Using existing BigQuery table for storing 
data...")
             self.configuration = self._use_existing_table()
 
             try:
diff --git a/providers/tests/google/cloud/transfers/test_gcs_to_bigquery.py 
b/providers/tests/google/cloud/transfers/test_gcs_to_bigquery.py
index 046e9a711bb..1097faa7491 100644
--- a/providers/tests/google/cloud/transfers/test_gcs_to_bigquery.py
+++ b/providers/tests/google/cloud/transfers/test_gcs_to_bigquery.py
@@ -1946,3 +1946,28 @@ class TestAsyncGCSToBigQueryOperator:
             "task_instance": task_instance,
             "logical_date": logical_date,
         }
+
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+    def test_force_delete_should_execute_successfully(self, hook):
+        hook.return_value.insert_job.side_effect = [
+            MagicMock(job_id=REAL_JOB_ID, error_result=False),
+            REAL_JOB_ID,
+        ]
+        hook.return_value.generate_job_id.return_value = REAL_JOB_ID
+        hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
+        hook.return_value.get_job.return_value.result.return_value = ("1",)
+
+        operator = GCSToBigQueryOperator(
+            task_id=TASK_ID,
+            bucket=TEST_BUCKET,
+            source_objects=TEST_SOURCE_OBJECTS,
+            destination_project_dataset_table=TEST_EXPLICIT_DEST,
+            write_disposition=WRITE_DISPOSITION,
+            schema_fields=SCHEMA_FIELDS_INT,
+            autodetect=True,
+            project_id=JOB_PROJECT_ID,
+            force_delete=True,
+        )
+
+        operator.execute(context=MagicMock())
+        
hook.return_value.delete_table.assert_called_once_with(table_id=TEST_EXPLICIT_DEST)

Reply via email to