This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d74a2f53e47 Fix ReadAllFromBigQuery leak temp dataset (#31895)
d74a2f53e47 is described below
commit d74a2f53e474c206ee329bdb747cf9302a221a65
Author: Yi Hu <[email protected]>
AuthorDate: Tue Jul 16 11:20:50 2024 -0400
Fix ReadAllFromBigQuery leak temp dataset (#31895)
* Fix ReadAllFromBigQuery leak temp dataset
* Fix potential duplicate job name
---
.../apache_beam/io/gcp/bigquery_read_internal.py | 49 +++++++++++-----------
.../apache_beam/io/gcp/bigquery_read_it_test.py | 4 +-
2 files changed, 27 insertions(+), 26 deletions(-)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py
b/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py
index ce49cd0161d..f74b7dabfb7 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py
@@ -24,7 +24,7 @@ import collections
import decimal
import json
import logging
-import random
+import secrets
import time
import uuid
from typing import TYPE_CHECKING
@@ -212,7 +212,7 @@ class _BigQueryReadSplit(beam.transforms.DoFn):
self._source_uuid = unique_id
self.kms_key = kms_key
self.project = project
- self.temp_dataset = temp_dataset or 'bq_read_all_%s' % uuid.uuid4().hex
+ self.temp_dataset = temp_dataset
self.query_priority = query_priority
self.bq_io_metadata = None
@@ -226,22 +226,27 @@ class _BigQueryReadSplit(beam.transforms.DoFn):
'temp_dataset': str(self.temp_dataset)
}
- def _get_temp_dataset(self):
- if isinstance(self.temp_dataset, str):
- return DatasetReference(
- datasetId=self.temp_dataset, projectId=self._get_project())
- else:
+ def _get_temp_dataset_id(self):
+ if self.temp_dataset is None:
+ return None
+ elif isinstance(self.temp_dataset, DatasetReference):
+ return self.temp_dataset.datasetId
+ elif isinstance(self.temp_dataset, str):
return self.temp_dataset
+ else:
+ raise ValueError("temp_dataset has to be either str or DatasetReference")
- def process(self,
- element: 'ReadFromBigQueryRequest') -> Iterable[BoundedSource]:
- bq = bigquery_tools.BigQueryWrapper(
- temp_dataset_id=self._get_temp_dataset().datasetId,
+ def setup(self):
+ self.bq = bigquery_tools.BigQueryWrapper(
+ temp_dataset_id=self._get_temp_dataset_id(),
client=bigquery_tools.BigQueryWrapper._bigquery_client(self.options))
+ def process(self,
+ element: 'ReadFromBigQueryRequest') -> Iterable[BoundedSource]:
if element.query is not None:
- self._setup_temporary_dataset(bq, element)
- table_reference = self._execute_query(bq, element)
+ if not self.bq.created_temp_dataset:
+ self._setup_temporary_dataset(self.bq, element)
+ table_reference = self._execute_query(self.bq, element)
else:
assert element.table
table_reference = bigquery_tools.parse_table_reference(
@@ -250,19 +255,21 @@ class _BigQueryReadSplit(beam.transforms.DoFn):
if not table_reference.projectId:
table_reference.projectId = self._get_project()
- schema, metadata_list = self._export_files(bq, element, table_reference)
+ schema, metadata_list = self._export_files(
+ self.bq, element, table_reference)
for metadata in metadata_list:
yield self._create_source(metadata.path, schema)
if element.query is not None:
- bq._delete_table(
+ self.bq._delete_table(
table_reference.projectId,
table_reference.datasetId,
table_reference.tableId)
- if bq.created_temp_dataset:
- self._clean_temporary_dataset(bq, element)
+ def teardown(self):
+ if self.bq.created_temp_dataset:
+ self.bq.clean_up_temporary_dataset(self._get_project())
def _get_bq_metadata(self):
if not self.bq_io_metadata:
@@ -288,12 +295,6 @@ class _BigQueryReadSplit(beam.transforms.DoFn):
self._get_project(), element.query, not element.use_standard_sql)
bq.create_temporary_dataset(self._get_project(), location)
- def _clean_temporary_dataset(
- self,
- bq: bigquery_tools.BigQueryWrapper,
- element: 'ReadFromBigQueryRequest'):
- bq.clean_up_temporary_dataset(self._get_project())
-
def _execute_query(
self,
bq: bigquery_tools.BigQueryWrapper,
@@ -302,7 +303,7 @@ class _BigQueryReadSplit(beam.transforms.DoFn):
self._job_name,
self._source_uuid,
bigquery_tools.BigQueryJobTypes.QUERY,
- '%s_%s' % (int(time.time()), random.randint(0, 1000)))
+ '%s_%s' % (int(time.time()), secrets.token_hex(3)))
job = bq._start_query_job(
self._get_project(),
element.query,
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
index d56a4c76471..913d6e078d8 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
@@ -109,11 +109,11 @@ class BigQueryReadIntegrationTests(unittest.TestCase):
request = bigquery.BigqueryDatasetsDeleteRequest(
projectId=cls.project, datasetId=cls.dataset_id, deleteContents=True)
try:
- _LOGGER.info(
+ _LOGGER.debug(
"Deleting dataset %s in project %s", cls.dataset_id, cls.project)
cls.bigquery_client.client.datasets.Delete(request)
except HttpError:
- _LOGGER.debug(
+ _LOGGER.warning(
'Failed to clean up dataset %s in project %s',
cls.dataset_id,
cls.project)