add temp dataset location for non-query BigQuerySource
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/83dc58ee Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/83dc58ee Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/83dc58ee Branch: refs/heads/gearpump-runner Commit: 83dc58ee95e96586266c72f29e9d7c55c8ca0739 Parents: 3ef614c Author: Uwe Jugel <[email protected]> Authored: Wed Apr 12 14:56:50 2017 +0200 Committer: [email protected] <[email protected]> Committed: Wed Apr 19 17:55:12 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/gcp/bigquery.py | 47 ++++++++++++++++++++----- 1 file changed, 38 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/83dc58ee/sdks/python/apache_beam/io/gcp/bigquery.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 25f544d..4686518 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -605,9 +605,27 @@ class BigQueryReader(dataflow_io.NativeSourceReader): else: self.query = self.source.query + def _get_source_table_location(self): + tr = self.source.table_reference + if tr is None: + # TODO: implement location retrieval for query sources + return + + if tr.projectId is None: + source_project_id = self.executing_project + else: + source_project_id = tr.projectId + + source_dataset_id = tr.datasetId + source_table_id = tr.tableId + source_location = self.client.get_table_location( + source_project_id, source_dataset_id, source_table_id) + return source_location + def __enter__(self): self.client = BigQueryWrapper(client=self.test_bigquery_client) - self.client.create_temporary_dataset(self.executing_project) + self.client.create_temporary_dataset( + self.executing_project, location=self._get_source_table_location()) return self def __exit__(self, exception_type, exception_value, traceback): @@ -801,7 +819,7 @@ class BigQueryWrapper(object): @retry.with_exponential_backoff( num_retries=MAX_RETRIES, retry_filter=retry.retry_on_server_errors_and_timeout_filter) - def get_or_create_dataset(self, project_id, dataset_id): + def get_or_create_dataset(self, project_id, dataset_id, location=None): # Check if dataset already exists otherwise create it try: dataset = self.client.datasets.Get(bigquery.BigqueryDatasetsGetRequest( @@ -809,9 +827,11 @@ class BigQueryWrapper(object): return dataset except HttpError as exn: if exn.status_code == 404: - dataset = bigquery.Dataset( - datasetReference=bigquery.DatasetReference( - projectId=project_id, datasetId=dataset_id)) + dataset_reference = bigquery.DatasetReference( + projectId=project_id, datasetId=dataset_id) + dataset = bigquery.Dataset(datasetReference=dataset_reference) + if location is not None: + dataset.location = location request = bigquery.BigqueryDatasetsInsertRequest( projectId=project_id, dataset=dataset) response = self.client.datasets.Insert(request) @@ -867,7 +887,15 @@ class BigQueryWrapper(object): @retry.with_exponential_backoff( num_retries=MAX_RETRIES, retry_filter=retry.retry_on_server_errors_and_timeout_filter) - def create_temporary_dataset(self, project_id): + def get_table_location(self, project_id, dataset_id, table_id): + table = self._get_table(project_id, dataset_id, table_id) + return table.location + + @retry.with_exponential_backoff( + num_retries=MAX_RETRIES, + retry_filter=retry.retry_on_server_errors_and_timeout_filter) + def create_temporary_dataset(self, project_id, location=None): + # TODO: make location required, once "query" locations can be determined dataset_id = BigQueryWrapper.TEMP_DATASET + self._temporary_table_suffix # Check if dataset exists to make sure that the temporary id is unique try: @@ -881,9 +909,10 @@ class BigQueryWrapper(object): except HttpError as exn: if exn.status_code == 404: logging.warning( - 'Dataset %s:%s does not exist so we will create it as temporary', - project_id, dataset_id) - self.get_or_create_dataset(project_id, dataset_id) + 'Dataset %s:%s does not exist so we will create it as temporary ' + 'with location=%s', + project_id, dataset_id, location) + self.get_or_create_dataset(project_id, dataset_id, location=location) else: raise
