pabloem commented on a change in pull request #13170:
URL: https://github.com/apache/beam/pull/13170#discussion_r522399968
##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1919,3 +1890,77 @@ def file_path_to_remove(unused_elm):
*self._args,
**self._kwargs))
| _PassThroughThenCleanup(files_to_remove_pcoll))
+
+
+@experimental()
+class ReadAllFromBigQuery(PTransform):
+ """Read data from BigQuery.
+
+ PTransform:ReadAllFromBigQueryRequest->Rows
+
+ This PTransform uses a BigQuery export job to take a snapshot of the table
+ on GCS, and then reads from each produced JSON file.
+
+ It is recommended not to use this PTransform for streaming jobs on
+ GlobalWindow, since it will not be able to cleanup snapshots.
+
+ Args:
+ gcs_location (str): The name of the Google Cloud Storage
+ bucket where the extracted table should be written as a string. If
+ :data:`None`, then the temp_location parameter is used.
+ validate (bool): If :data:`True`, various checks will be done when source
+ gets initialized (e.g., is table present?).
+ kms_key (str): Experimental. Optional Cloud KMS key name for use when
+ creating new temporary tables.
+ """
+ COUNTER = 0
+
+ def __init__(
+ self,
+ gcs_location: Union[str, ValueProvider] = None,
+ validate: bool = False,
+ kms_key: str = None,
+ bigquery_job_labels: Dict[str, str] = None):
+ if gcs_location:
+ if not isinstance(gcs_location, (str, ValueProvider)):
+ raise TypeError(
+ '%s: gcs_location must be of type string'
+ ' or ValueProvider; got %r instead' %
+ (self.__class__.__name__, type(gcs_location)))
+
+ self.gcs_location = gcs_location
+ self.validate = validate
+ self.kms_key = kms_key
+ self.bigquery_job_labels = bigquery_job_labels
+
+ def expand(self, pcoll):
+ job_name = pcoll.pipeline.options.view_as(GoogleCloudOptions).job_name
+ project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project
+ unique_id = str(uuid.uuid4())[0:10]
+
+ try:
+ step_name = self.label
+ except AttributeError:
+ step_name = 'ReadAllFromBigQuery_%d' % ReadAllFromBigQuery.COUNTER
+ ReadAllFromBigQuery.COUNTER += 1
+
+ sources_to_read, cleanup_locations = (
+ pcoll
+ | beam.ParDo(
+ # TODO(pabloem): Make sure we have all necessary args.
Review comment:
removed. thanks Tyson!
##########
File path: sdks/python/apache_beam/io/gcp/bigquery_read_internal.py
##########
@@ -100,3 +121,290 @@ def process(self, unused_element, unused_signal,
gcs_locations):
)
return main_output
+
+
+class ReadFromBigQueryRequest:
+ """
+ Class that defines data to read from BQ.
+ """
+ def __init__(
+ self,
+ query: str = None,
+ use_standard_sql: bool = True,
+ table: Union[str, TableReference] = None,
+ flatten_results: bool = False):
+ """
+ Only one of query or table should be specified.
+
+ :param query: SQL query to fetch data.
+ :param use_standard_sql:
+ Specifies whether to use BigQuery's standard SQL dialect for this query.
+ The default value is :data:`True`. If set to :data:`False`,
+ the query will use BigQuery's legacy SQL dialect.
+ This parameter is ignored for table inputs.
+ :param table:
+ The ID of the table to read. The ID must contain only letters
+ ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. Table should
+ define project and dataset (ex.: ``'PROJECT:DATASET.TABLE'``).
+ :param flatten_results:
+ Flattens all nested and repeated fields in the query results.
+ The default value is :data:`True`.
Review comment:
oops good catch. Thanks Tyson!
##########
File path: sdks/python/apache_beam/io/gcp/bigquery_read_internal.py
##########
@@ -100,3 +121,290 @@ def process(self, unused_element, unused_signal,
gcs_locations):
)
return main_output
+
+
+class ReadFromBigQueryRequest:
+ """
+ Class that defines data to read from BQ.
+ """
+ def __init__(
+ self,
+ query: str = None,
+ use_standard_sql: bool = True,
+ table: Union[str, TableReference] = None,
+ flatten_results: bool = False):
+ """
+ Only one of query or table should be specified.
+
+ :param query: SQL query to fetch data.
+ :param use_standard_sql:
+ Specifies whether to use BigQuery's standard SQL dialect for this query.
+ The default value is :data:`True`. If set to :data:`False`,
+ the query will use BigQuery's legacy SQL dialect.
+ This parameter is ignored for table inputs.
+ :param table:
+ The ID of the table to read. The ID must contain only letters
+ ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. Table should
+ define project and dataset (ex.: ``'PROJECT:DATASET.TABLE'``).
+ :param flatten_results:
+ Flattens all nested and repeated fields in the query results.
+ The default value is :data:`True`.
+ """
+ self.flatten_results = flatten_results
+ self.query = query
+ self.use_standard_sql = use_standard_sql
+ self.table = table
+ self.validate()
+
+ # We use this internal object ID to generate BigQuery export directories.
+ self.obj_id = random.randint(0, 100000)
+
+ def validate(self):
+ if self.table is not None and self.query is not None:
+ raise ValueError(
+ 'Both a BigQuery table and a query were specified.'
+ ' Please specify only one of these.')
+ elif self.table is None and self.query is None:
+ raise ValueError('A BigQuery table or a query must be specified')
+ if self.table is not None:
+ if isinstance(self.table, str):
+ assert self.table.find('.'), (
+ 'Expected a table reference '
+ '(PROJECT:DATASET.TABLE or DATASET.TABLE) instead of %s'
+ % self.table)
+
+
+class _BigQueryReadSplit(beam.transforms.DoFn):
+ def __init__(
+ self,
+ options: PipelineOptions,
+ gcs_location: Union[str, ValueProvider] = None,
+ use_json_exports: bool = False,
+ bigquery_job_labels: Dict[str, str] = None,
+ step_name: str = None,
+ job_name: str = None,
+ unique_id: str = None,
+ kms_key: str = None,
+ project: str = None):
+ self.options = options
+ self.use_json_exports = use_json_exports
+ self.gcs_location = gcs_location
+ self.bigquery_job_labels = bigquery_job_labels or {}
+ self._step_name = step_name
+ self._job_name = job_name or 'BQ_READ_SPLIT'
+ self._source_uuid = unique_id
+ self.kms_key = kms_key
+ self.project = project
+ self.bq_io_metadata = None
+
+ def process(self, element: ReadFromBigQueryRequest, *args,
+ **kwargs) -> Iterable[BoundedSource]:
+ bq = bigquery_tools.BigQueryWrapper()
+
+ if element.query is not None:
+ self._setup_temporary_dataset(bq, element)
+ table_reference = self._execute_query(bq, element)
+ else:
+ assert element.table
+ table_reference = bigquery_tools.parse_table_reference(
+ element.table, project=self._get_project())
+
+ if not table_reference.projectId:
+ table_reference.projectId = self._get_project()
+
+ schema, metadata_list = self._export_files(bq, element, table_reference)
+ split_result = [
+ self._create_source(metadata.path, schema) for metadata in
metadata_list
+ ]
+
+ if element.query is not None:
+ bq.clean_up_temporary_dataset(self._get_project())
Review comment:
that's not possible in this case. We issue a table export in
export_files, and only after that's finished is that we can delete the dataset.
But I've moved the yield above.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]