tysonjh commented on a change in pull request #13170:
URL: https://github.com/apache/beam/pull/13170#discussion_r520185234
##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1919,3 +1890,77 @@ def file_path_to_remove(unused_elm):
*self._args,
**self._kwargs))
| _PassThroughThenCleanup(files_to_remove_pcoll))
+
+
+@experimental()
+class ReadAllFromBigQuery(PTransform):
+ """Read data from BigQuery.
+
+ PTransform:ReadAllFromBigQueryRequest->Rows
+
+ This PTransform uses a BigQuery export job to take a snapshot of the table
+ on GCS, and then reads from each produced JSON file.
+
+ It is recommended not to use this PTransform for streaming jobs on
+ GlobalWindow, since it will not be able to cleanup snapshots.
+
+ Args:
+ gcs_location (str): The name of the Google Cloud Storage
+ bucket where the extracted table should be written as a string. If
+ :data:`None`, then the temp_location parameter is used.
+ validate (bool): If :data:`True`, various checks will be done when source
+ gets initialized (e.g., is table present?).
+ kms_key (str): Experimental. Optional Cloud KMS key name for use when
+ creating new temporary tables.
+ """
+ COUNTER = 0
+
+ def __init__(
+ self,
+ gcs_location: Union[str, ValueProvider] = None,
+ validate: bool = False,
+ kms_key: str = None,
+ bigquery_job_labels: Dict[str, str] = None):
+ if gcs_location:
+ if not isinstance(gcs_location, (str, ValueProvider)):
+ raise TypeError(
+ '%s: gcs_location must be of type string'
+ ' or ValueProvider; got %r instead' %
+ (self.__class__.__name__, type(gcs_location)))
+
+ self.gcs_location = gcs_location
+ self.validate = validate
+ self.kms_key = kms_key
+ self.bigquery_job_labels = bigquery_job_labels
+
+ def expand(self, pcoll):
+ job_name = pcoll.pipeline.options.view_as(GoogleCloudOptions).job_name
+ project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project
+ unique_id = str(uuid.uuid4())[0:10]
+
+ try:
+ step_name = self.label
+ except AttributeError:
+ step_name = 'ReadAllFromBigQuery_%d' % ReadAllFromBigQuery.COUNTER
+ ReadAllFromBigQuery.COUNTER += 1
+
+ sources_to_read, cleanup_locations = (
+ pcoll
+ | beam.ParDo(
+ # TODO(pabloem): Make sure we have all necessary args.
Review comment:
This should be resolved or attributed to a Jira issue.
##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -79,6 +79,41 @@
`ReadFromBigQuery`, you can use the flag `use_json_exports` to export
data as JSON, and receive base64-encoded bytes.
+ReadAllFromBigQuery
+-------------------
+Beam 2.27.0 introduces a new transform called `ReadAllFromBigQuery` which
+allows you to define table and query reads from BigQuery at pipeline
+runtime.:::
+
+ read_requests = p | beam.Create([
+ ReadFromBigQueryRequest(query='SELECT * FROM mydataset.mytable'),
+ ReadFromBigQueryRequest(table='myproject.mydataset.mytable')])
+ results = read_requests | ReadAllFromBigQuery()
+
+A good application for this transform is in streaming pipelines to
+refresh a side input coming from BigQuery. This would work like so:::
+
+ side_input = (
+ p
+ | 'PeriodicImpulse' >> PeriodicImpulse(
+ first_timestamp, last_timestamp, interval, True)
+ | 'MapToReadRequest' >> beam.Map(
+ lambda x: BigQueryReadRequest(table='dataset.table'))
Review comment:
I see a few different names here,
`ReadAllFromBigQuery`
`ReadFromBigQueryRequest`
`BigQueryReadRequest`
I'm a bit confused by the differences and interaction between these classes.
If `ReadFromBigQueryRequest` is something users interact with it should not
be in an internal file (e.g. `bigquery_read_internal.py`). Is there a need to
expose that at all? Instead could it just be:
```
side_input = (
p
| 'PeriodicImpulse' >> PeriodicImpulse(...)
| beam.io.ReadAllFromBigQuery(table=...))
```
Though this would make the initial example of several requests being
included in a single ReadAll not possible. Is this something that needs to be
special cased, as opposed to say, using a flatten?
##########
File path: sdks/python/apache_beam/io/gcp/bigquery_read_internal.py
##########
@@ -100,3 +121,290 @@ def process(self, unused_element, unused_signal,
gcs_locations):
)
return main_output
+
+
+class ReadFromBigQueryRequest:
+ """
+ Class that defines data to read from BQ.
+ """
+ def __init__(
+ self,
+ query: str = None,
+ use_standard_sql: bool = True,
+ table: Union[str, TableReference] = None,
+ flatten_results: bool = False):
+ """
+ Only one of query or table should be specified.
+
+ :param query: SQL query to fetch data.
+ :param use_standard_sql:
+ Specifies whether to use BigQuery's standard SQL dialect for this query.
+ The default value is :data:`True`. If set to :data:`False`,
+ the query will use BigQuery's legacy SQL dialect.
+ This parameter is ignored for table inputs.
+ :param table:
+ The ID of the table to read. The ID must contain only letters
+ ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. Table should
+ define project and dataset (ex.: ``'PROJECT:DATASET.TABLE'``).
+ :param flatten_results:
+ Flattens all nested and repeated fields in the query results.
+ The default value is :data:`True`.
+ """
+ self.flatten_results = flatten_results
+ self.query = query
+ self.use_standard_sql = use_standard_sql
+ self.table = table
+ self.validate()
+
+ # We use this internal object ID to generate BigQuery export directories.
+ self.obj_id = random.randint(0, 100000)
+
+ def validate(self):
+ if self.table is not None and self.query is not None:
+ raise ValueError(
+ 'Both a BigQuery table and a query were specified.'
+ ' Please specify only one of these.')
+ elif self.table is None and self.query is None:
+ raise ValueError('A BigQuery table or a query must be specified')
+ if self.table is not None:
+ if isinstance(self.table, str):
+ assert self.table.find('.'), (
+ 'Expected a table reference '
+ '(PROJECT:DATASET.TABLE or DATASET.TABLE) instead of %s'
+ % self.table)
+
+
+class _BigQueryReadSplit(beam.transforms.DoFn):
+ def __init__(
+ self,
+ options: PipelineOptions,
+ gcs_location: Union[str, ValueProvider] = None,
+ use_json_exports: bool = False,
+ bigquery_job_labels: Dict[str, str] = None,
+ step_name: str = None,
+ job_name: str = None,
+ unique_id: str = None,
+ kms_key: str = None,
+ project: str = None):
+ self.options = options
+ self.use_json_exports = use_json_exports
+ self.gcs_location = gcs_location
+ self.bigquery_job_labels = bigquery_job_labels or {}
+ self._step_name = step_name
+ self._job_name = job_name or 'BQ_READ_SPLIT'
+ self._source_uuid = unique_id
+ self.kms_key = kms_key
+ self.project = project
+ self.bq_io_metadata = None
+
+ def process(self, element: ReadFromBigQueryRequest, *args,
+ **kwargs) -> Iterable[BoundedSource]:
+ bq = bigquery_tools.BigQueryWrapper()
+
+ if element.query is not None:
+ self._setup_temporary_dataset(bq, element)
+ table_reference = self._execute_query(bq, element)
+ else:
+ assert element.table
+ table_reference = bigquery_tools.parse_table_reference(
+ element.table, project=self._get_project())
+
+ if not table_reference.projectId:
+ table_reference.projectId = self._get_project()
+
+ schema, metadata_list = self._export_files(bq, element, table_reference)
+ split_result = [
+ self._create_source(metadata.path, schema) for metadata in
metadata_list
+ ]
+
+ if element.query is not None:
+ bq.clean_up_temporary_dataset(self._get_project())
Review comment:
Can this be moved up to the other `if element.query` condition? That may
allow putting the `yield` into the for loop above, getting rid of the
intermediate `split_result` and avoiding the additional iteration.
##########
File path: sdks/python/apache_beam/io/gcp/bigquery_read_internal.py
##########
@@ -100,3 +121,290 @@ def process(self, unused_element, unused_signal,
gcs_locations):
)
return main_output
+
+
+class ReadFromBigQueryRequest:
+ """
+ Class that defines data to read from BQ.
+ """
+ def __init__(
+ self,
+ query: str = None,
+ use_standard_sql: bool = True,
+ table: Union[str, TableReference] = None,
+ flatten_results: bool = False):
+ """
+ Only one of query or table should be specified.
+
+ :param query: SQL query to fetch data.
+ :param use_standard_sql:
+ Specifies whether to use BigQuery's standard SQL dialect for this query.
+ The default value is :data:`True`. If set to :data:`False`,
+ the query will use BigQuery's legacy SQL dialect.
+ This parameter is ignored for table inputs.
+ :param table:
+ The ID of the table to read. The ID must contain only letters
+ ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. Table should
+ define project and dataset (ex.: ``'PROJECT:DATASET.TABLE'``).
+ :param flatten_results:
+ Flattens all nested and repeated fields in the query results.
+ The default value is :data:`True`.
+ """
+ self.flatten_results = flatten_results
+ self.query = query
+ self.use_standard_sql = use_standard_sql
+ self.table = table
+ self.validate()
+
+ # We use this internal object ID to generate BigQuery export directories.
Review comment:
May be worth noting that there is also a UUID involved. I was worried
about collisions until I read on a bit further.
##########
File path: sdks/python/apache_beam/io/gcp/bigquery_read_internal.py
##########
@@ -100,3 +121,290 @@ def process(self, unused_element, unused_signal,
gcs_locations):
)
return main_output
+
+
+class ReadFromBigQueryRequest:
+ """
+ Class that defines data to read from BQ.
+ """
+ def __init__(
+ self,
+ query: str = None,
+ use_standard_sql: bool = True,
+ table: Union[str, TableReference] = None,
+ flatten_results: bool = False):
+ """
+ Only one of query or table should be specified.
+
+ :param query: SQL query to fetch data.
+ :param use_standard_sql:
+ Specifies whether to use BigQuery's standard SQL dialect for this query.
+ The default value is :data:`True`. If set to :data:`False`,
+ the query will use BigQuery's legacy SQL dialect.
+ This parameter is ignored for table inputs.
+ :param table:
+ The ID of the table to read. The ID must contain only letters
+ ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. Table should
+ define project and dataset (ex.: ``'PROJECT:DATASET.TABLE'``).
+ :param flatten_results:
+ Flattens all nested and repeated fields in the query results.
+ The default value is :data:`True`.
Review comment:
The default here is `False`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]