TheNeuralBit commented on code in PR #17159:
URL: https://github.com/apache/beam/pull/17159#discussion_r936025435
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2716,23 +2726,53 @@ def __init__(
if isinstance(gcs_location, str):
gcs_location = StaticValueProvider(str, gcs_location)
+ if self.output_type == 'BEAM_ROW' and self._kwargs.get('query',
+ None) is not None:
+ raise ValueError(
+ "Both a query and an output type of 'BEAM_ROW' were specified. "
+ "'BEAM_ROW' is not currently supported with queries.")
+
self.gcs_location = gcs_location
self.bigquery_dataset_labels = {
'type': 'bq_direct_read_' + str(uuid.uuid4())[0:10]
}
- self._args = args
- self._kwargs = kwargs
def expand(self, pcoll):
if self.method is ReadFromBigQuery.Method.EXPORT:
- return self._expand_export(pcoll)
+ output_pcollection = self._expand_export(pcoll)
+ return ReadFromBigQuery._expand_output_type(self, output_pcollection)
elif self.method is ReadFromBigQuery.Method.DIRECT_READ:
- return self._expand_direct_read(pcoll)
+ output_pcollection = self._expand_direct_read(pcoll)
+ return ReadFromBigQuery._expand_output_type(self, output_pcollection)
Review Comment:
nit, you could just do this call once, like:
```py
if self.method is ReadFromBigQuery.Method.EXPORT:
output_pcollection = self._expand_export(pcoll)
elif self.method is ReadFromBigQuery.Method.DIRECT_READ:
output_pcollection = self._expand_direct_read(pcoll)
else:
raise ValueError(
'The method to read from BigQuery must be either EXPORT'
'or DIRECT_READ.')
return self._expand_output_type(output_pcollection)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]