gemini-code-assist[bot] commented on code in PR #39160:
URL: https://github.com/apache/beam/pull/39160#discussion_r3494182560


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -3004,6 +3019,12 @@ def _expand_output_type(self, output_pcollection):
     if self.output_type == 'PYTHON_DICT' or self.output_type is None:
       return output_pcollection
     elif self.output_type == 'BEAM_ROW':
+      # When a query is used, schema cannot be derived from an existing table.
+      # Use the user-supplied query_output_schema directly instead.
+      if self._kwargs.get('query', None) is not None:
+        return output_pcollection | bigquery_schema_tools.convert_to_usertype(
+            self.query_output_schema, self._kwargs.get('selected_fields', 
None))

Review Comment:
   ![critical](https://www.gstatic.com/codereviewagent/critical.svg)
   
   The `query_output_schema` parameter can be passed as a dictionary or a JSON 
string (as documented in the docstring and used in YAML). However, 
`bigquery_schema_tools.convert_to_usertype` expects a `TableSchema` object. 
Passing a dictionary or string directly will result in an `AttributeError` at 
runtime (e.g., `'dict' object has no attribute 'fields'`). Use 
`bigquery_tools.get_dict_table_schema` to normalize the schema before passing 
it to `convert_to_usertype`.
   
   ```suggestion
         if self._kwargs.get('query', None) is not None:
           user_schema = 
bigquery_tools.get_dict_table_schema(self.query_output_schema)
           return output_pcollection | 
bigquery_schema_tools.convert_to_usertype(
               user_schema, self._kwargs.get('selected_fields', None))
   ```



##########
sdks/python/apache_beam/yaml/yaml_io.py:
##########
@@ -119,18 +120,27 @@ def read_from_bigquery(
       specified field is a nested field, all the sub-fields in the field will 
be
       selected. The output field order is unrelated to the order of fields
       given here.
+    schema (dict): Required when query is set. A BigQuery schema describing
+      the query result columns, e.g.
+      ``{'fields': [{'name': 'col', 'type': 'STRING', 'mode': 'NULLABLE'}]}``.
+      Not applicable when reading from a table (schema is auto-derived).
   """
   if query is None:
     assert table is not None
   else:

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   If a user provides a `schema` parameter for a table-based read (where 
`query` is `None`), it will be silently ignored because `ReadFromBigQuery` only 
uses `query_output_schema` when a query is specified. To prevent silent 
failures and improve usability, raise a `ValueError` if `schema` is provided 
for a table-based read.
   
   ```python
     if query is None:
       assert table is not None
       if schema is not None:
         raise ValueError(
             "The 'schema' parameter is only supported when reading with a 
'query'. "
             "For table-based reads, the schema is automatically derived.")
     else:
   ```



##########
sdks/python/apache_beam/yaml/yaml_io_test.py:
##########
@@ -764,6 +764,52 @@ def expand(self, pcoll):
             ]))
 
 
+class ReadFromBigQueryTest(unittest.TestCase):
+  def test_query_without_schema_raises(self):
+    from apache_beam.yaml.yaml_io import read_from_bigquery
+    with self.assertRaisesRegex(ValueError, 'schema'):
+      read_from_bigquery(query='SELECT id FROM dataset.table')
+
+  def test_table_without_schema_ok(self):
+    # Table-based reads don't need schema — should not raise at construction
+    import unittest.mock as mock
+
+    from apache_beam.yaml.yaml_io import read_from_bigquery
+    with mock.patch('apache_beam.yaml.yaml_io.ReadFromBigQuery') as mock_rfbq:
+      mock_rfbq.return_value = mock.MagicMock()
+      # Should not raise
+      read_from_bigquery(table='project:dataset.table')
+      mock_rfbq.assert_called_once()
+      call_kwargs = mock_rfbq.call_args[1]
+      # schema should not be passed for table reads
+      self.assertIsNone(call_kwargs.get('query_output_schema'))
+

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Add a unit test to verify that providing a `schema` for a table-based read 
raises a `ValueError` as expected.
   
   ```suggestion
         self.assertIsNone(call_kwargs.get('query_output_schema'))
   
     def test_table_with_schema_raises(self):
       from apache_beam.yaml.yaml_io import read_from_bigquery
       with self.assertRaisesRegex(ValueError, 'only supported when reading 
with a'):
         read_from_bigquery(table='project:dataset.table', schema={'fields': 
[]})
   ```



##########
sdks/python/apache_beam/io/gcp/bigquery_test.py:
##########
@@ -777,6 +777,58 @@ def test_read_all_lineage(self):
             'bigquery:project2.dataset2.table2'
         ]))
 
+  def test_query_with_beam_row_requires_schema(self):
+    # Should raise when query + BEAM_ROW given but no query_output_schema
+    with self.assertRaisesRegex(ValueError, 'query_output_schema'):
+      ReadFromBigQuery(
+          query='SELECT id, name FROM dataset.table', output_type='BEAM_ROW')
+
+  def test_query_with_beam_row_and_schema_accepted(self):
+    # Should NOT raise when query_output_schema is provided
+    schema = {
+        'fields': [
+            {
+                'name': 'id', 'type': 'INTEGER', 'mode': 'NULLABLE'
+            },
+            {
+                'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'
+            },
+        ]
+    }
+    # Constructor should succeed — no error at init time
+    transform = ReadFromBigQuery(
+        query='SELECT id, name FROM dataset.table',
+        output_type='BEAM_ROW',
+        query_output_schema=schema)
+    self.assertEqual(transform.query_output_schema, schema)
+
+  def test_expand_output_type_uses_query_schema(self):
+    schema = {
+        'fields': [
+            {
+                'name': 'id', 'type': 'INTEGER', 'mode': 'NULLABLE'
+            },
+            {
+                'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'
+            },
+        ]
+    }
+    transform = ReadFromBigQuery(
+        query='SELECT id, name FROM dataset.table',
+        output_type='BEAM_ROW',
+        query_output_schema=schema)
+
+    with mock.patch.object(bigquery_tools.BigQueryWrapper,
+                           'get_table') as mock_get_table, \
+         mock.patch('apache_beam.io.gcp.bigquery.bigquery_schema_tools'
+                    '.convert_to_usertype') as mock_convert:
+      mock_convert.return_value = beam.Map(lambda x: x)
+      fake_pcoll = mock.MagicMock()
+      transform._expand_output_type(fake_pcoll)
+
+    mock_get_table.assert_not_called()
+    mock_convert.assert_called_once_with(schema, None)

Review Comment:
   ![critical](https://www.gstatic.com/codereviewagent/critical.svg)
   
   Since `query_output_schema` is now normalized to a `TableSchema` object 
using `bigquery_tools.get_dict_table_schema`, the mocked `convert_to_usertype` 
will be called with the converted `TableSchema` object rather than the raw 
dictionary. Update the assertion to reflect this conversion.
   
   ```suggestion
       mock_get_table.assert_not_called()
       mock_convert.assert_called_once_with(
           bigquery_tools.get_dict_table_schema(schema), None)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to