TheNeuralBit commented on code in PR #17159:
URL: https://github.com/apache/beam/pull/17159#discussion_r931662874
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2434,6 +2440,7 @@ def __init__(
gcs_location=None,
method=None,
use_native_datetime=False,
Review Comment:
Just noticed this parameter. According to the docs by default DATETIME
columns are returned as strings, if this parameter is True they're returned as
Python datetime instances. We'll need to account for this in the schema we
generate.
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2459,19 +2466,55 @@ def __init__(
self.bigquery_dataset_labels = {
'type': 'bq_direct_read_' + str(uuid.uuid4())[0:10]
}
+ self.output_type = output_type
self._args = args
self._kwargs = kwargs
def expand(self, pcoll):
if self.method is ReadFromBigQuery.Method.EXPORT:
- return self._expand_export(pcoll)
+ output_pcollection = self._expand_export(pcoll)
+ return ReadFromBigQuery._expand_output_type_export(
+ self, output_pcollection)
elif self.method is ReadFromBigQuery.Method.DIRECT_READ:
- return self._expand_direct_read(pcoll)
+ output_pcollection = self._expand_direct_read(pcoll)
+ return ReadFromBigQuery._expand_output_type_direct_read(
+ self, output_pcollection)
else:
raise ValueError(
'The method to read from BigQuery must be either EXPORT'
'or DIRECT_READ.')
+ def _expand_output_type_export(self, output_pcollection):
+ if self.output_type == 'BEAM_ROWS':
+ start_idx = self._kwargs['table'].index(
+ ':') if ':' in self._kwargs['table'] else 0
+ end_idx = self._kwargs['table'].index('.')
+ return output_pcollection | beam.io.gcp.bigquery_schema_tools. \
+ convert_to_usertype(
+ beam.io.gcp.bigquery.bigquery_tools.BigQueryWrapper().get_table(
+ project_id=output_pcollection.pipeline.options.view_as(
+ GoogleCloudOptions).project,
+ dataset_id=str(self._kwargs['table']
+ [start_idx+1 if ':' in self._kwargs['table']
+ else 0:end_idx]),
+ table_id=str(self._kwargs['table']).rsplit(
+ '.', maxsplit=1)[-1]).schema)
+ elif self.output_type == 'PYTHON_DICT' or self.output_type is None:
+ return output_pcollection
+
+ def _expand_output_type_direct_read(self, output_pcollection):
+ if self.output_type == 'BEAM_ROWS':
+ table_details = bigquery_tools.parse_table_reference(
+ self._kwargs['table'])
+ return output_pcollection | beam.io.gcp.bigquery_schema_tools.\
+ convert_to_usertype(
+ beam.io.gcp.bigquery.bigquery_tools.BigQueryWrapper().get_table(
+ project_id=table_details.projectId,
+ dataset_id=table_details.datasetId,
+ table_id=table_details.tableId).schema)
+ elif self.output_type == 'PYTHON_DICT' or self.output_type is None:
+ return output_pcollection
Review Comment:
Hm I'm not clear on why we need to have separate logic here. The docs for
the table parameter indicate:
```
table (str, callable, ValueProvider): The ID of the table, or a callable
that returns it. The ID must contain only letters ``a-z``, ``A-Z``,
numbers ``0-9``, or underscores ``_``. If dataset argument is
:data:`None` then the table argument must contain the entire table
reference specified as: ``'DATASET.TABLE'``
or ``'PROJECT:DATASET.TABLE'``. If it's a callable, it must receive one
argument representing an element to be written to BigQuery, and return
a TableReference, or a string table name as specified above.
```
It looks like it can be of either form, independent of which read mode is
used. Further, it looks like `parse_table_reference`, _should_ just handle all
these variations if we pass in table, project, and dataset:
https://github.com/apache/beam/blob/d51b497fb229d75eef8b7baee98cdb817a592a58/sdks/python/apache_beam/io/gcp/bigquery_tools.py#L214-L235
So couldn't we have one version of this logic that resolves the source table
with `parse_table_reference`?
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2422,6 +2422,12 @@ class ReadFromBigQuery(PTransform):
to run queries with INTERACTIVE priority. This option is ignored when
reading from a table rather than a query. To learn more about query
priority, see: https://cloud.google.com/bigquery/docs/running-queries
Review Comment:
I looked over the other parameters here, I think there are a couple of cases
that we will need to reject when they are used in conjunction with
`output_type=BEAM_ROW`:
- If `table` is a callable or a ValueProvider
- If `query` is used instead of project/dataset/table
In both of these cases we can't (easily) resolve the schema at pipeline
construction time. I think as written now we will just fail anyway when either
of these happen, but it won't be a helpful error. We should detect these cases
and raise a nice error (and test that we do) and document the limitation.
(In theory we should be able to determine the schema for the `query` case
too, but that's a little trickier. I think it's a good item to leave for future
work)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]