TheNeuralBit commented on code in PR #17159:
URL: https://github.com/apache/beam/pull/17159#discussion_r931662874


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2434,6 +2440,7 @@ def __init__(
       gcs_location=None,
       method=None,
       use_native_datetime=False,

Review Comment:
   Just noticed this parameter. According to the docs by default DATETIME 
columns are returned as strings, if this parameter is True they're returned as 
Python datetime instances. We'll need to account for this in the schema we 
generate.



##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2459,19 +2466,55 @@ def __init__(
     self.bigquery_dataset_labels = {
         'type': 'bq_direct_read_' + str(uuid.uuid4())[0:10]
     }
+    self.output_type = output_type
     self._args = args
     self._kwargs = kwargs
 
   def expand(self, pcoll):
     if self.method is ReadFromBigQuery.Method.EXPORT:
-      return self._expand_export(pcoll)
+      output_pcollection = self._expand_export(pcoll)
+      return ReadFromBigQuery._expand_output_type_export(
+          self, output_pcollection)
     elif self.method is ReadFromBigQuery.Method.DIRECT_READ:
-      return self._expand_direct_read(pcoll)
+      output_pcollection = self._expand_direct_read(pcoll)
+      return ReadFromBigQuery._expand_output_type_direct_read(
+          self, output_pcollection)
     else:
       raise ValueError(
           'The method to read from BigQuery must be either EXPORT'
           'or DIRECT_READ.')
 
+  def _expand_output_type_export(self, output_pcollection):
+    if self.output_type == 'BEAM_ROWS':
+      start_idx = self._kwargs['table'].index(
+          ':') if ':' in self._kwargs['table'] else 0
+      end_idx = self._kwargs['table'].index('.')
+      return output_pcollection | beam.io.gcp.bigquery_schema_tools. \
+          convert_to_usertype(
+          beam.io.gcp.bigquery.bigquery_tools.BigQueryWrapper().get_table(
+              project_id=output_pcollection.pipeline.options.view_as(
+                GoogleCloudOptions).project,
+              dataset_id=str(self._kwargs['table']
+                             [start_idx+1 if ':' in self._kwargs['table']
+                              else 0:end_idx]),
+              table_id=str(self._kwargs['table']).rsplit(
+                  '.', maxsplit=1)[-1]).schema)
+    elif self.output_type == 'PYTHON_DICT' or self.output_type is None:
+      return output_pcollection
+
+  def _expand_output_type_direct_read(self, output_pcollection):
+    if self.output_type == 'BEAM_ROWS':
+      table_details = bigquery_tools.parse_table_reference(
+          self._kwargs['table'])
+      return output_pcollection | beam.io.gcp.bigquery_schema_tools.\
+            convert_to_usertype(
+            beam.io.gcp.bigquery.bigquery_tools.BigQueryWrapper().get_table(
+                project_id=table_details.projectId,
+                dataset_id=table_details.datasetId,
+                table_id=table_details.tableId).schema)
+    elif self.output_type == 'PYTHON_DICT' or self.output_type is None:
+      return output_pcollection

Review Comment:
   Hm I'm not clear on why we need to have separate logic here. The docs for 
the table parameter indicate: 
   ```
   table (str, callable, ValueProvider): The ID of the table, or a callable
         that returns it. The ID must contain only letters ``a-z``, ``A-Z``,
         numbers ``0-9``, or underscores ``_``. If dataset argument is
         :data:`None` then the table argument must contain the entire table
         reference specified as: ``'DATASET.TABLE'``
         or ``'PROJECT:DATASET.TABLE'``. If it's a callable, it must receive one
         argument representing an element to be written to BigQuery, and return
         a TableReference, or a string table name as specified above.
   ```
   
   It looks like it can be of either form, independent of which read mode is 
used. Further, it looks like `parse_table_reference`, _should_ just handle all 
these variations if we pass in table, project, and dataset:
   
https://github.com/apache/beam/blob/d51b497fb229d75eef8b7baee98cdb817a592a58/sdks/python/apache_beam/io/gcp/bigquery_tools.py#L214-L235
   
   So couldn't we have one version of this logic that resolves the source table 
with `parse_table_reference`?



##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2422,6 +2422,12 @@ class ReadFromBigQuery(PTransform):
       to run queries with INTERACTIVE priority. This option is ignored when
       reading from a table rather than a query. To learn more about query
       priority, see: https://cloud.google.com/bigquery/docs/running-queries

Review Comment:
   I looked over the other parameters here, I think there are a couple of cases 
that we will need to reject when they are used in conjunction with 
`output_type=BEAM_ROW`:
   - If `table` is a callable or a ValueProvider
   - If `query` is used instead of project/dataset/table
   
   In both of these cases we can't (easily) resolve the schema at pipeline 
construction time. I think as written now we will just fail anyway when either 
of these happen, but it won't be a helpful error. We should detect these cases 
and raise a nice error (and test that we do) and document the limitation.
   
   (In theory we should be able to determine the schema for the `query` case 
too, but that's a little trickier. I think it's a good item to leave for future 
work)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to