ryanyuan closed pull request #4164: [AIRFLOW-3318] BigQueryHook check if 
dataset exists
URL: https://github.com/apache/incubator-airflow/pull/4164
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/hooks/bigquery_hook.py 
b/airflow/contrib/hooks/bigquery_hook.py
index d300dbe6b7..1c6f7329cd 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -1515,6 +1515,26 @@ def get_datasets_list(self, project_id=None):
 
         return datasets_list
 
+    def dataset_exists(self, project_id, dataset_id):
+        """
+        Checks for the existence of a dataset in Google BigQuery.
+
+        :param project_id: The Google cloud project in which to look for the
+            dataset. The connection supplied to the hook must provide access to
+            the specified project.
+        :type project_id: str
+        :param dataset_id: The name of the dataset to check the existence of.
+        :type dataset_id: str
+        """
+        try:
+            self.service.datasets().get(
+                projectId=project_id, datasetId=dataset_id).execute()
+            return True
+        except errors.HttpError as e:
+            if e.resp['status'] == '404':
+                return False
+            raise
+
 
 class BigQueryCursor(BigQueryBaseCursor):
     """
diff --git a/tests/contrib/hooks/test_bigquery_hook.py 
b/tests/contrib/hooks/test_bigquery_hook.py
index 8f350ff2ee..ad0bef1694 100644
--- a/tests/contrib/hooks/test_bigquery_hook.py
+++ b/tests/contrib/hooks/test_bigquery_hook.py
@@ -22,13 +22,15 @@
 
 from google.auth.exceptions import GoogleAuthError
 import mock
-
+from apiclient import errors
 from airflow.contrib.hooks import bigquery_hook as hook
 from airflow.contrib.hooks.bigquery_hook import _cleanse_time_partitioning, \
     _validate_value, _api_resource_configs_duplication_check
 
 bq_available = True
 
+EMPTY_CONTENT = ''.encode('utf8')
+
 try:
     hook.BigQueryHook().get_service()
 except GoogleAuthError:
@@ -401,6 +403,49 @@ def test_get_datasets_list(self):
                 project_id=project_id)
             self.assertEqual(result, expected_result['datasets'])
 
+    def test_check_dataset_exists(self):
+        dataset_id = "dataset_test"
+        project_id = "project-test"
+        dataset_result = {
+            "kind": "bigquery#dataset",
+            "location": "US",
+            "id": "{}:{}".format(project_id, dataset_id),
+            "datasetReference": {
+                "projectId": project_id,
+                "datasetId": dataset_id
+            }
+        }
+
+        mocked = mock.Mock()
+        with mock.patch.object(
+            hook.BigQueryBaseCursor(mocked, project_id).service, "datasets"
+        ) as mock_service:
+            mock_service.return_value.get(
+                datasetId=dataset_id, projectId=project_id
+            ).execute.return_value = dataset_result
+            result = hook.BigQueryBaseCursor(
+                mocked, "test_check_dataset_exists"
+            ).dataset_exists(dataset_id=dataset_id, project_id=project_id)
+            self.assertTrue(result)
+
+    def test_check_dataset_exists_not_exist(self):
+        dataset_id = "dataset_test"
+        project_id = "project_test"
+
+        mocked = mock.Mock()
+        with mock.patch.object(
+            hook.BigQueryBaseCursor(mocked, project_id).service, "datasets"
+        ) as mock_service:
+            (
+                mock_service.return_value.get(
+                    dataset_id=dataset_id, project_id=project_id
+                ).execute.side_effect
+            ) = errors.HttpError(resp={"status": "404"}, content=EMPTY_CONTENT)
+            result = hook.BigQueryBaseCursor(
+                mocked, "test_check_dataset_exists_not_found"
+            ).dataset_exists(dataset_id=dataset_id, project_id=project_id)
+            self.assertFalse(result)
+
 
 class TestTimePartitioningInRunJob(unittest.TestCase):
     @mock.patch("airflow.contrib.hooks.bigquery_hook.LoggingMixin")


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to