This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 02976be Refactor: BigQuery to GCS Operator (#22506)
02976be is described below
commit 02976bef885a5da29a8be59b32af51edbf94466c
Author: Shuho Yoshida <[email protected]>
AuthorDate: Mon Mar 28 05:21:35 2022 +0900
Refactor: BigQuery to GCS Operator (#22506)
---
airflow/providers/google/cloud/hooks/bigquery.py | 6 ++--
.../google/cloud/transfers/bigquery_to_gcs.py | 36 ++++++----------------
.../google/cloud/transfers/test_bigquery_to_gcs.py | 32 +++++++------------
3 files changed, 24 insertions(+), 50 deletions(-)
diff --git a/airflow/providers/google/cloud/hooks/bigquery.py
b/airflow/providers/google/cloud/hooks/bigquery.py
index 1202c11..3642e8e 100644
--- a/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/airflow/providers/google/cloud/hooks/bigquery.py
@@ -1905,7 +1905,7 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
def run_extract(
self,
source_project_dataset_table: str,
- destination_cloud_storage_uris: str,
+ destination_cloud_storage_uris: List[str],
compression: str = 'NONE',
export_format: str = 'CSV',
field_delimiter: str = ',',
@@ -1945,7 +1945,7 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
var_name='source_project_dataset_table',
)
- configuration = {
+ configuration: Dict[str, Any] = {
'extract': {
'sourceTable': {
'projectId': source_project,
@@ -1956,7 +1956,7 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
'destinationUris': destination_cloud_storage_uris,
'destinationFormat': export_format,
}
- } # type: Dict[str, Any]
+ }
if labels:
configuration['labels'] = labels
diff --git a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
index 2515f47..69c64d0 100644
--- a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
@@ -17,9 +17,7 @@
# under the License.
"""This module contains Google BigQuery to Google Cloud Storage operator."""
import warnings
-from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Union
-
-from google.cloud.bigquery.table import TableReference
+from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Union
from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
@@ -128,26 +126,12 @@ class BigQueryToGCSOperator(BaseOperator):
location=self.location,
impersonation_chain=self.impersonation_chain,
)
-
- table_ref =
TableReference.from_string(self.source_project_dataset_table, hook.project_id)
-
- configuration: Dict[str, Any] = {
- 'extract': {
- 'sourceTable': table_ref.to_api_repr(),
- 'compression': self.compression,
- 'destinationUris': self.destination_cloud_storage_uris,
- 'destinationFormat': self.export_format,
- }
- }
-
- if self.labels:
- configuration['labels'] = self.labels
-
- if self.export_format == 'CSV':
- # Only set fieldDelimiter and printHeader fields if using CSV.
- # Google does not like it if you set these fields for other export
- # formats.
- configuration['extract']['fieldDelimiter'] = self.field_delimiter
- configuration['extract']['printHeader'] = self.print_header
-
- hook.insert_job(configuration=configuration)
+ hook.run_extract(
+ source_project_dataset_table=self.source_project_dataset_table,
+ destination_cloud_storage_uris=self.destination_cloud_storage_uris,
+ compression=self.compression,
+ export_format=self.export_format,
+ field_delimiter=self.field_delimiter,
+ print_header=self.print_header,
+ labels=self.labels,
+ )
diff --git a/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py
b/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py
index 2ddac81..4542172 100644
--- a/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py
+++ b/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py
@@ -27,10 +27,10 @@ TEST_TABLE_ID = 'test-table-id'
PROJECT_ID = 'test-project-id'
-class TestBigQueryToCloudStorageOperator(unittest.TestCase):
+class TestBigQueryToGCSOperator(unittest.TestCase):
@mock.patch('airflow.providers.google.cloud.transfers.bigquery_to_gcs.BigQueryHook')
def test_execute(self, mock_hook):
- source_project_dataset_table = f'{TEST_DATASET}.{TEST_TABLE_ID}'
+ source_project_dataset_table =
f'{PROJECT_ID}:{TEST_DATASET}.{TEST_TABLE_ID}'
destination_cloud_storage_uris = ['gs://some-bucket/some-file.txt']
compression = 'NONE'
export_format = 'CSV'
@@ -38,24 +38,6 @@ class TestBigQueryToCloudStorageOperator(unittest.TestCase):
print_header = True
labels = {'k1': 'v1'}
- mock_hook().project_id = PROJECT_ID
-
- configuration = {
- 'extract': {
- 'sourceTable': {
- 'projectId': mock_hook().project_id,
- 'datasetId': TEST_DATASET,
- 'tableId': TEST_TABLE_ID,
- },
- 'compression': compression,
- 'destinationUris': destination_cloud_storage_uris,
- 'destinationFormat': export_format,
- 'fieldDelimiter': field_delimiter,
- 'printHeader': print_header,
- },
- 'labels': labels,
- }
-
operator = BigQueryToGCSOperator(
task_id=TASK_ID,
source_project_dataset_table=source_project_dataset_table,
@@ -69,4 +51,12 @@ class TestBigQueryToCloudStorageOperator(unittest.TestCase):
operator.execute(None)
-
mock_hook.return_value.insert_job.assert_called_once_with(configuration=configuration)
+ mock_hook.return_value.run_extract.assert_called_once_with(
+ source_project_dataset_table=source_project_dataset_table,
+ destination_cloud_storage_uris=destination_cloud_storage_uris,
+ compression=compression,
+ export_format=export_format,
+ field_delimiter=field_delimiter,
+ print_header=print_header,
+ labels=labels,
+ )