kaxil closed pull request #3894: [AIRFLOW-3055] add get_dataset and
get_datasets_list to bigquery_hook
URL: https://github.com/apache/incubator-airflow/pull/3894
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/airflow/contrib/hooks/bigquery_hook.py
b/airflow/contrib/hooks/bigquery_hook.py
index dd77df1283..dba4618e35 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -1441,6 +1441,86 @@ def delete_dataset(self, project_id, dataset_id):
'BigQuery job failed. Error was: {}'.format(err.content)
)
+ def get_dataset(self, dataset_id, project_id=None):
+ """
+ Method returns dataset_resource if dataset exist
+ and raised 404 error if dataset does not exist
+
+ :param dataset_id: The BigQuery Dataset ID
+ :type dataset_id: str
+ :param project_id: The GCP Project ID
+ :type project_id: str
+ :return: dataset_resource
+
+ .. seealso::
+ For more information, see Dataset Resource content:
+
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
+ """
+
+ if not dataset_id or not isinstance(dataset_id, str):
+ raise ValueError("dataset_id argument must be provided and has "
+ "a type 'str'. You provided:
{}".format(dataset_id))
+
+ dataset_project_id = project_id if project_id else self.project_id
+
+ try:
+ dataset_resource = self.service.datasets().get(
+ datasetId=dataset_id, projectId=dataset_project_id).execute()
+ self.log.info("Dataset Resource: {}".format(dataset_resource))
+ except HttpError as err:
+ raise AirflowException(
+ 'BigQuery job failed. Error was: {}'.format(err.content))
+
+ return dataset_resource
+
+ def get_datasets_list(self, project_id=None):
+ """
+ Method returns full list of BigQuery datasets in the current project
+
+ .. seealso::
+ For more information, see:
+
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/list
+
+ :param project_id: Google Cloud Project for which you
+ try to get all datasets
+ :type project_id: str
+ :return: datasets_list
+
+ Example of returned datasets_list: ::
+
+ {
+ "kind":"bigquery#dataset",
+ "location":"US",
+ "id":"your-project:dataset_2_test",
+ "datasetReference":{
+ "projectId":"your-project",
+ "datasetId":"dataset_2_test"
+ }
+ },
+ {
+ "kind":"bigquery#dataset",
+ "location":"US",
+ "id":"your-project:dataset_1_test",
+ "datasetReference":{
+ "projectId":"your-project",
+ "datasetId":"dataset_1_test"
+ }
+ }
+ ]
+ """
+ dataset_project_id = project_id if project_id else self.project_id
+
+ try:
+ datasets_list = self.service.datasets().list(
+ projectId=dataset_project_id).execute()['datasets']
+ self.log.info("Datasets List: {}".format(datasets_list))
+
+ except HttpError as err:
+ raise AirflowException(
+ 'BigQuery job failed. Error was: {}'.format(err.content))
+
+ return datasets_list
+
class BigQueryCursor(BigQueryBaseCursor):
"""
diff --git a/tests/contrib/hooks/test_bigquery_hook.py
b/tests/contrib/hooks/test_bigquery_hook.py
index 84fe84043e..77a31f0320 100644
--- a/tests/contrib/hooks/test_bigquery_hook.py
+++ b/tests/contrib/hooks/test_bigquery_hook.py
@@ -360,6 +360,68 @@ def test_create_empty_dataset_duplicates_call_err(self,
{"datasetId": "test_dataset",
"projectId": "project_test2"}})
+ def test_get_dataset_without_dataset_id(self):
+ with mock.patch.object(hook.BigQueryHook, 'get_service'):
+ with self.assertRaises(ValueError):
+ hook.BigQueryBaseCursor(
+ mock.Mock(), "test_create_empty_dataset").get_dataset(
+ dataset_id="", project_id="project_test")
+
+ def test_get_dataset(self):
+ expected_result = {
+ "kind": "bigquery#dataset",
+ "location": "US",
+ "id": "your-project:dataset_2_test",
+ "datasetReference": {
+ "projectId": "your-project",
+ "datasetId": "dataset_2_test"
+ }
+ }
+ dataset_id = "test_dataset"
+ project_id = "project_test"
+
+ bq_hook = hook.BigQueryBaseCursor(mock.Mock(), project_id)
+ with mock.patch.object(bq_hook.service, 'datasets') as MockService:
+ MockService.return_value.get(datasetId=dataset_id,
+ projectId=project_id).execute.\
+ return_value = expected_result
+ result = bq_hook.get_dataset(dataset_id=dataset_id,
+ project_id=project_id)
+ self.assertEqual(result, expected_result)
+
+ def test_get_datasets_list(self):
+ expected_result = {'datasets': [
+ {
+ "kind": "bigquery#dataset",
+ "location": "US",
+ "id": "your-project:dataset_2_test",
+ "datasetReference": {
+ "projectId": "your-project",
+ "datasetId": "dataset_2_test"
+ }
+ },
+ {
+ "kind": "bigquery#dataset",
+ "location": "US",
+ "id": "your-project:dataset_1_test",
+ "datasetReference": {
+ "projectId": "your-project",
+ "datasetId": "dataset_1_test"
+ }
+ }
+ ]}
+ project_id = "project_test"''
+
+ mocked = mock.Mock()
+ with mock.patch.object(hook.BigQueryBaseCursor(mocked,
project_id).service,
+ 'datasets') as MockService:
+ MockService.return_value.list(
+ projectId=project_id).execute.return_value = expected_result
+ result = hook.BigQueryBaseCursor(
+ mocked, "test_create_empty_dataset").get_datasets_list(
+ project_id=project_id)
+ self.assertEqual(result, expected_result['datasets'])
+
class TestTimePartitioningInRunJob(unittest.TestCase):
@mock.patch("airflow.contrib.hooks.bigquery_hook.LoggingMixin")
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services