svetakvsundhar commented on code in PR #17159:
URL: https://github.com/apache/beam/pull/17159#discussion_r931693810
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2459,19 +2466,55 @@ def __init__(
self.bigquery_dataset_labels = {
'type': 'bq_direct_read_' + str(uuid.uuid4())[0:10]
}
+ self.output_type = output_type
self._args = args
self._kwargs = kwargs
def expand(self, pcoll):
if self.method is ReadFromBigQuery.Method.EXPORT:
- return self._expand_export(pcoll)
+ output_pcollection = self._expand_export(pcoll)
+ return ReadFromBigQuery._expand_output_type_export(
+ self, output_pcollection)
elif self.method is ReadFromBigQuery.Method.DIRECT_READ:
- return self._expand_direct_read(pcoll)
+ output_pcollection = self._expand_direct_read(pcoll)
+ return ReadFromBigQuery._expand_output_type_direct_read(
+ self, output_pcollection)
else:
raise ValueError(
'The method to read from BigQuery must be either EXPORT'
'or DIRECT_READ.')
+ def _expand_output_type_export(self, output_pcollection):
+ if self.output_type == 'BEAM_ROWS':
+ start_idx = self._kwargs['table'].index(
+ ':') if ':' in self._kwargs['table'] else 0
+ end_idx = self._kwargs['table'].index('.')
+ return output_pcollection | beam.io.gcp.bigquery_schema_tools. \
+ convert_to_usertype(
+ beam.io.gcp.bigquery.bigquery_tools.BigQueryWrapper().get_table(
+ project_id=output_pcollection.pipeline.options.view_as(
+ GoogleCloudOptions).project,
+ dataset_id=str(self._kwargs['table']
+ [start_idx+1 if ':' in self._kwargs['table']
+ else 0:end_idx]),
+ table_id=str(self._kwargs['table']).rsplit(
+ '.', maxsplit=1)[-1]).schema)
+ elif self.output_type == 'PYTHON_DICT' or self.output_type is None:
+ return output_pcollection
+
+ def _expand_output_type_direct_read(self, output_pcollection):
+ if self.output_type == 'BEAM_ROWS':
+ table_details = bigquery_tools.parse_table_reference(
+ self._kwargs['table'])
+ return output_pcollection | beam.io.gcp.bigquery_schema_tools.\
+ convert_to_usertype(
+ beam.io.gcp.bigquery.bigquery_tools.BigQueryWrapper().get_table(
+ project_id=table_details.projectId,
+ dataset_id=table_details.datasetId,
+ table_id=table_details.tableId).schema)
+ elif self.output_type == 'PYTHON_DICT' or self.output_type is None:
+ return output_pcollection
Review Comment:
Yeah, this will work if you specify the following format:
`PROJECT:DATASET.TABLE` but fails if you specify it like `DATASET.TABLE`, and
use the project field separately. Basically, in the latter case, it is failing
because it is storing the project ID as `None`.
For some reason `parse_table_reference` is not able to pick up the project
field. I brought this up with the IO team and we agreed to look further into it.
For the purposes of this PR, I'd be happy to re-use my `direct_read`
implementation, but document the format that the project,table, and dataset
need to be specified in `ReadFromBigQuery`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]