[ 
https://issues.apache.org/jira/browse/AIRFLOW-2887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16623726#comment-16623726
 ] 

ASF GitHub Bot commented on AIRFLOW-2887:
-----------------------------------------

kaxil closed pull request #3876: [AIRFLOW-2887] Add to BigQueryBaseCursor 
methods for insert (create empty) dataset
URL: https://github.com/apache/incubator-airflow/pull/3876
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/hooks/bigquery_hook.py 
b/airflow/contrib/hooks/bigquery_hook.py
index 693270fad4..dd77df1283 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -960,7 +960,7 @@ def run_load(self,
         if not set(allowed_schema_update_options).issuperset(
                 set(schema_update_options)):
             raise ValueError(
-                "{0} contains invalid schema update options. "
+                "{0} contains invalid schema update options."
                 "Please only use one or more of the following options: {1}"
                 .format(schema_update_options, allowed_schema_update_options))
 
@@ -1350,6 +1350,72 @@ def run_grant_dataset_view_access(self,
                 view_project, view_dataset, view_table, source_project, 
source_dataset)
             return source_dataset_resource
 
+    def create_empty_dataset(self, dataset_id="", project_id="",
+                             dataset_reference=None):
+        """
+        Create a new empty dataset:
+        
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/insert
+
+        :param project_id: The name of the project where we want to create
+            an empty a dataset. Don't need to provide, if projectId in 
dataset_reference.
+        :type project_id: str
+        :param dataset_id: The id of dataset. Don't need to provide,
+            if datasetId in dataset_reference.
+        :type dataset_id: str
+        :param dataset_reference: Dataset reference that could be provided
+            with request body. More info:
+            
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
+        :type dataset_reference: dict
+        """
+
+        if dataset_reference:
+            _validate_value('dataset_reference', dataset_reference, dict)
+        else:
+            dataset_reference = {}
+
+        if "datasetReference" not in dataset_reference:
+            dataset_reference["datasetReference"] = {}
+
+        if not dataset_reference["datasetReference"].get("datasetId") and not 
dataset_id:
+            raise ValueError(
+                "{} not provided datasetId. Impossible to create dataset")
+
+        dataset_required_params = [(dataset_id, "datasetId", ""),
+                                   (project_id, "projectId", self.project_id)]
+        for param_tuple in dataset_required_params:
+            param, param_name, param_default = param_tuple
+            if param_name not in dataset_reference['datasetReference']:
+                if param_default and not param:
+                    self.log.info("{} was not specified. Will be used default "
+                                  "value {}.".format(param_name,
+                                                     param_default))
+                    param = param_default
+                dataset_reference['datasetReference'].update(
+                    {param_name: param})
+            elif param:
+                _api_resource_configs_duplication_check(
+                    param_name, param,
+                    dataset_reference['datasetReference'], 'dataset_reference')
+
+        dataset_id = dataset_reference.get("datasetReference").get("datasetId")
+        dataset_project_id = dataset_reference.get("datasetReference").get(
+            "projectId")
+
+        self.log.info('Creating Dataset: %s in project: %s ', dataset_id,
+                      dataset_project_id)
+
+        try:
+            self.service.datasets().insert(
+                projectId=dataset_project_id,
+                body=dataset_reference).execute()
+            self.log.info('Dataset created successfully: In project %s '
+                          'Dataset %s', dataset_project_id, dataset_id)
+
+        except HttpError as err:
+            raise AirflowException(
+                'BigQuery job failed. Error was: {}'.format(err.content)
+            )
+
     def delete_dataset(self, project_id, dataset_id):
         """
         Delete a dataset of Big query in your project.
@@ -1671,10 +1737,11 @@ def _validate_value(key, value, expected_type):
             key, expected_type, type(value)))
 
 
-def _api_resource_configs_duplication_check(key, value, config_dict):
+def _api_resource_configs_duplication_check(key, value, config_dict,
+                                            
config_dict_name='api_resource_configs'):
     if key in config_dict and value != config_dict[key]:
         raise ValueError("Values of {param_name} param are duplicated. "
-                         "`api_resource_configs` contained {param_name} param "
+                         "{dict_name} contained {param_name} param "
                          "in `query` config and {param_name} was also provided 
"
                          "with arg to run_query() method. Please remove 
duplicates."
-                         .format(param_name=key))
+                         .format(param_name=key, dict_name=config_dict_name))
diff --git a/airflow/contrib/operators/bigquery_operator.py 
b/airflow/contrib/operators/bigquery_operator.py
index 8eee6a395c..9386e57c07 100644
--- a/airflow/contrib/operators/bigquery_operator.py
+++ b/airflow/contrib/operators/bigquery_operator.py
@@ -528,12 +528,11 @@ class BigQueryDeleteDatasetOperator(BaseOperator):
 
     **Example**: ::
 
-        delete_temp_data = BigQueryDeleteDatasetOperator(
-                                        dataset_id = 'temp-dataset',
-                                        project_id = 'temp-project',
-                                        bigquery_conn_id='_my_gcp_conn_',
-                                        task_id='Deletetemp',
-                                        dag=dag)
+        delete_temp_data = BigQueryDeleteDatasetOperator(dataset_id = 
'temp-dataset',
+                                                         project_id = 
'temp-project',
+                                                         
bigquery_conn_id='_my_gcp_conn_',
+                                                         task_id='Deletetemp',
+                                                         dag=dag)
     """
 
     template_fields = ('dataset_id', 'project_id')
@@ -567,3 +566,66 @@ def execute(self, context):
             project_id=self.project_id,
             dataset_id=self.dataset_id
         )
+
+
+class BigQueryCreateEmptyDatasetOperator(BaseOperator):
+    """"
+    This operator is used to create new dataset for your Project in Big query.
+    https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
+
+    :param project_id: The name of the project where we want to create the 
dataset.
+        Don't need to provide, if projectId in dataset_reference.
+    :type project_id: str
+    :param dataset_id: The id of dataset. Don't need to provide,
+        if datasetId in dataset_reference.
+    :type dataset_id: str
+    :param dataset_reference: Dataset reference that could be provided with 
request body.
+        More info:
+        
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
+    :type dataset_reference: dict
+
+        **Example**: ::
+
+            create_new_dataset = BigQueryCreateEmptyDatasetOperator(
+                                    dataset_id = 'new-dataset',
+                                    project_id = 'my-project',
+                                    dataset_reference = {"friendlyName": "New 
Dataset"}
+                                    bigquery_conn_id='_my_gcp_conn_',
+                                    task_id='newDatasetCreator',
+                                    dag=dag)
+
+    """
+
+    template_fields = ('dataset_id', 'project_id')
+    ui_color = '#f0eee4'
+
+    @apply_defaults
+    def __init__(self,
+                 dataset_id,
+                 project_id=None,
+                 dataset_reference=None,
+                 bigquery_conn_id='bigquery_default',
+                 delegate_to=None,
+                 *args, **kwargs):
+        self.dataset_id = dataset_id
+        self.project_id = project_id
+        self.bigquery_conn_id = bigquery_conn_id
+        self.dataset_reference = dataset_reference if dataset_reference else {}
+        self.delegate_to = delegate_to
+
+        self.log.info('Dataset id: %s', self.dataset_id)
+        self.log.info('Project id: %s', self.project_id)
+
+        super(BigQueryCreateEmptyDatasetOperator, self).__init__(*args, 
**kwargs)
+
+    def execute(self, context):
+        bq_hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
+                               delegate_to=self.delegate_to)
+
+        conn = bq_hook.get_conn()
+        cursor = conn.cursor()
+
+        cursor.create_empty_dataset(
+            project_id=self.project_id,
+            dataset_id=self.dataset_id,
+            dataset_reference=self.dataset_reference)
diff --git a/docs/code.rst b/docs/code.rst
index fcabca0000..e5a53193e6 100644
--- a/docs/code.rst
+++ b/docs/code.rst
@@ -120,6 +120,7 @@ Operators
 .. autoclass:: 
airflow.contrib.operators.bigquery_operator.BigQueryCreateEmptyTableOperator
 .. autoclass:: 
airflow.contrib.operators.bigquery_operator.BigQueryCreateExternalTableOperator
 .. autoclass:: 
airflow.contrib.operators.bigquery_operator.BigQueryDeleteDatasetOperator
+.. autoclass:: 
airflow.contrib.operators.bigquery_operator.BigQueryCreateEmptyDatasetOperator
 .. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryOperator
 .. autoclass:: 
airflow.contrib.operators.bigquery_table_delete_operator.BigQueryTableDeleteOperator
 .. autoclass:: 
airflow.contrib.operators.bigquery_to_bigquery.BigQueryToBigQueryOperator
diff --git a/docs/integration.rst b/docs/integration.rst
index 4c513bf26d..522d4bbd44 100644
--- a/docs/integration.rst
+++ b/docs/integration.rst
@@ -350,6 +350,7 @@ BigQuery Operators
 - :ref:`BigQueryCreateEmptyTableOperator` : Creates a new, empty table in the 
specified BigQuery dataset optionally with schema.
 - :ref:`BigQueryCreateExternalTableOperator` : Creates a new, external table 
in the dataset with the data in Google Cloud Storage.
 - :ref:`BigQueryDeleteDatasetOperator` : Deletes an existing BigQuery dataset.
+- :ref:`BigQueryCreateEmptyDatasetOperator` : Creates an empty BigQuery 
dataset.
 - :ref:`BigQueryOperator` : Executes BigQuery SQL queries in a specific 
BigQuery database.
 - :ref:`BigQueryToBigQueryOperator` : Copy a BigQuery table to another 
BigQuery table.
 - :ref:`BigQueryToCloudStorageOperator` : Transfers a BigQuery table to a 
Google Cloud Storage bucket
@@ -404,6 +405,13 @@ BigQueryDeleteDatasetOperator
 
 .. autoclass:: 
airflow.contrib.operators.bigquery_operator.BigQueryDeleteDatasetOperator
 
+.. _BigQueryCreateEmptyDatasetOperator:
+
+BigQueryCreateEmptyDatasetOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+.. autoclass:: 
airflow.contrib.operators.bigquery_operator.BigQueryCreateEmptyDatasetOperator
+
 .. _BigQueryOperator:
 
 BigQueryOperator
diff --git a/tests/contrib/hooks/test_bigquery_hook.py 
b/tests/contrib/hooks/test_bigquery_hook.py
index 0006b0c616..84fe84043e 100644
--- a/tests/contrib/hooks/test_bigquery_hook.py
+++ b/tests/contrib/hooks/test_bigquery_hook.py
@@ -337,8 +337,31 @@ def run_with_config(config):
         mocked_rwc.assert_called_once()
 
 
-class TestTimePartitioningInRunJob(unittest.TestCase):
+class TestDatasetsOperations(unittest.TestCase):
+
+    @mock.patch.object(hook.BigQueryBaseCursor, 'run_with_configuration')
+    def test_create_empty_dataset_no_dataset_id_err(self,
+                                                    run_with_configuration):
+
+        with self.assertRaises(ValueError):
+            hook.BigQueryBaseCursor(
+                mock.Mock(), "test_create_empty_dataset").create_empty_dataset(
+                dataset_id="", project_id="")
 
+    @mock.patch.object(hook.BigQueryBaseCursor, 'run_with_configuration')
+    def test_create_empty_dataset_duplicates_call_err(self,
+                                                      run_with_configuration):
+        with self.assertRaises(ValueError):
+            hook.BigQueryBaseCursor(
+                mock.Mock(), "test_create_empty_dataset").create_empty_dataset(
+                dataset_id="", project_id="project_test",
+                dataset_reference={
+                    "datasetReference":
+                        {"datasetId": "test_dataset",
+                         "projectId": "project_test2"}})
+
+
+class TestTimePartitioningInRunJob(unittest.TestCase):
     @mock.patch("airflow.contrib.hooks.bigquery_hook.LoggingMixin")
     @mock.patch("airflow.contrib.hooks.bigquery_hook.time")
     @mock.patch.object(hook.BigQueryBaseCursor, 'run_with_configuration')
diff --git a/tests/contrib/operators/test_bigquery_operator.py 
b/tests/contrib/operators/test_bigquery_operator.py
index 7c76ab73fe..9ce3b478d3 100644
--- a/tests/contrib/operators/test_bigquery_operator.py
+++ b/tests/contrib/operators/test_bigquery_operator.py
@@ -22,8 +22,8 @@
 
 from airflow.contrib.operators.bigquery_operator import \
     BigQueryCreateExternalTableOperator, \
-    BigQueryOperator, \
-    BigQueryCreateEmptyTableOperator, BigQueryDeleteDatasetOperator
+    BigQueryOperator, BigQueryCreateEmptyTableOperator, \
+    BigQueryDeleteDatasetOperator, BigQueryCreateEmptyDatasetOperator
 
 try:
     from unittest import mock
@@ -136,3 +136,24 @@ def test_execute(self, mock_hook):
                 dataset_id=TEST_DATASET,
                 project_id=TEST_PROJECT_ID
             )
+
+
+class BigQueryCreateEmptyDatasetOperatorTest(unittest.TestCase):
+    @mock.patch('airflow.contrib.operators.bigquery_operator.BigQueryHook')
+    def test_execute(self, mock_hook):
+        operator = BigQueryCreateEmptyDatasetOperator(
+            task_id=TASK_ID,
+            dataset_id=TEST_DATASET,
+            project_id=TEST_PROJECT_ID
+        )
+
+        operator.execute(None)
+        mock_hook.return_value \
+            .get_conn() \
+            .cursor() \
+            .create_empty_dataset \
+            .assert_called_once_with(
+                dataset_id=TEST_DATASET,
+                project_id=TEST_PROJECT_ID,
+                dataset_reference={}
+            )


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Add to BigQueryBaseCursor methods for creating insert dataset
> -------------------------------------------------------------
>
>                 Key: AIRFLOW-2887
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2887
>             Project: Apache Airflow
>          Issue Type: New Feature
>            Reporter: Iuliia Volkova
>            Assignee: Iuliia Volkova
>            Priority: Minor
>             Fix For: 2.0.0
>
>
> In BigQueryBaseCursor exist only:
> def delete_dataset(self, project_id, dataset_id)
>  And there are no hook to 
> create([https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/insert)]
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to