gemini-code-assist[bot] commented on code in PR #37416: URL: https://github.com/apache/beam/pull/37416#discussion_r2732973259
########## sdks/python/apache_beam/io/gcp/bigquery_tools.py: ########## @@ -359,11 +360,18 @@ class BigQueryWrapper(object): HISTOGRAM_METRIC_LOGGER = MetricLogger() - def __init__(self, client=None, temp_dataset_id=None, temp_table_ref=None): - self.client = client or BigQueryWrapper._bigquery_client(PipelineOptions()) - self.gcp_bq_client = client or gcp_bigquery.Client( - client_info=ClientInfo( - user_agent="apache-beam-%s" % apache_beam.__version__)) + def __init__( + self, + client=None, + temp_dataset_id=None, + temp_table_ref=None, + quota_project_id=None): + self.quota_project_id = quota_project_id + self.client = client or BigQueryWrapper._bigquery_client( + PipelineOptions(), quota_project_id=quota_project_id) + self.gcp_bq_client = ( + client if client else BigQueryWrapper._gcp_bigquery_client( + quota_project_id=quota_project_id)) Review Comment:  There's a potential bug here. `self.gcp_bq_client` is being initialized with `client` if it's provided. However, `client` is an `apitools` client (`bigquery.BigqueryV2`), while `self.gcp_bq_client` is expected to be a `google-cloud-bigquery` client. This will cause errors if a `client` is passed to `BigQueryWrapper` because the two client types have different APIs. This seems to be a pre-existing issue, but since you're modifying this code, it's a good opportunity to fix it. `self.gcp_bq_client` should always be initialized with a `google-cloud-bigquery` client, regardless of whether an `apitools` `client` is passed. ```python self.gcp_bq_client = BigQueryWrapper._gcp_bigquery_client( quota_project_id=quota_project_id) ``` ########## sdks/python/apache_beam/io/gcp/bigquery_tools.py: ########## @@ -1399,19 +1407,66 @@ def convert_row_to_dict(self, row, schema): @staticmethod def from_pipeline_options(pipeline_options: PipelineOptions): + """Create a BigQueryWrapper from pipeline options. + + Args: + pipeline_options: Pipeline options containing GCP configuration. + The quota_project_id is read from GoogleCloudOptions if set. + """ + quota_project_id = None + if pipeline_options is not None: + quota_project_id = pipeline_options.view_as( + GoogleCloudOptions).quota_project_id return BigQueryWrapper( - client=BigQueryWrapper._bigquery_client(pipeline_options)) + client=BigQueryWrapper._bigquery_client(pipeline_options), + quota_project_id=quota_project_id) @staticmethod - def _bigquery_client(pipeline_options: PipelineOptions): + def _bigquery_client( + pipeline_options: PipelineOptions, quota_project_id: str = None): + """Create a BigQuery API client from pipeline options. + + Args: + pipeline_options: Pipeline options for credentials. + quota_project_id: Optional quota project ID. If not provided, will be + extracted from pipeline_options. + """ + credentials = auth.get_service_credentials(pipeline_options) + # Use explicit quota_project_id if provided, otherwise get from options + if quota_project_id is None and pipeline_options is not None: + quota_project_id = pipeline_options.view_as( + GoogleCloudOptions).quota_project_id + if quota_project_id: + credentials = auth.with_quota_project(credentials, quota_project_id) return bigquery.BigqueryV2( http=get_new_http(), - credentials=auth.get_service_credentials(pipeline_options), + credentials=credentials, response_encoding='utf8', additional_http_headers={ "user-agent": "apache-beam-%s" % apache_beam.__version__ }) + @staticmethod + def _gcp_bigquery_client(quota_project_id: str = None): + """Create a google-cloud-bigquery Client with optional quota project.""" + credentials = None + + if quota_project_id: + # Get default credentials and apply quota project + try: + import google.auth + from google.auth import exceptions as auth_exceptions + credentials, _ = google.auth.default() + if hasattr(credentials, 'with_quota_project'): + credentials = credentials.with_quota_project(quota_project_id) + except (auth_exceptions.DefaultCredentialsError, AttributeError): + pass # Fall back to default credentials Review Comment:  The `try...except...pass` block here silently ignores errors when trying to get credentials and apply the quota project. This can make debugging difficult if something goes wrong. It would be better to log a warning, similar to what's done in `_create_bq_storage_client` in `sdks/python/apache_beam/io/gcp/bigquery.py`. ```python except (auth_exceptions.DefaultCredentialsError, AttributeError) as e: _LOGGER.warning( 'Failed to apply quota project %s to gcp-bigquery client: %s. ' 'Falling back to default client.', quota_project_id, e) pass # Fall back to default credentials ``` ########## sdks/python/apache_beam/internal/gcp/auth_test.py: ########## @@ -132,5 +132,58 @@ def raise_(scopes=None): auth._LOGGER.removeHandler(loggerHandler) +class WithQuotaProjectTest(unittest.TestCase): + """Tests for with_quota_project function.""" + def test_with_quota_project_returns_credentials_unchanged_when_none(self): + """Test that None credentials are returned unchanged.""" + result = auth.with_quota_project(None, 'my-project') + self.assertIsNone(result) + + def test_with_quota_project_returns_credentials_unchanged_when_no_quota(self): + """Test that credentials are returned unchanged when + quota_project_id is None.""" + mock_creds = mock.MagicMock() + result = auth.with_quota_project(mock_creds, None) + self.assertEqual(result, mock_creds) + mock_creds.with_quota_project.assert_not_called() + + def test_with_quota_project_applies_quota_to_wrapped_credentials(self): + """Test that quota project is applied to wrapped credentials.""" + mock_inner_creds = mock.MagicMock() + mock_new_creds = mock.MagicMock() + mock_inner_creds.with_quota_project.return_value = mock_new_creds + + mock_adapter = mock.MagicMock() + mock_adapter.get_google_auth_credentials.return_value = mock_inner_creds + + result = auth.with_quota_project(mock_adapter, 'my-billing-project') + + mock_inner_creds.with_quota_project.assert_called_once_with( + 'my-billing-project') + # Result should be a new adapter wrapping the new credentials + self.assertIsNotNone(result) Review Comment:  The assertion `self.assertIsNotNone(result)` is quite weak. It doesn't verify that the returned object is of the correct type (`_ApitoolsCredentialsAdapter`) and that it wraps the new credentials object. You can make this test stronger by patching `auth._ApitoolsCredentialsAdapter` and asserting that it's called correctly with the new credentials. ```python @mock.patch('apache_beam.internal.gcp.auth._ApitoolsCredentialsAdapter') def test_with_quota_project_applies_quota_to_wrapped_credentials( self, mock_adapter_class): """Test that quota project is applied to wrapped credentials.""" mock_inner_creds = mock.MagicMock() mock_new_creds = mock.MagicMock() mock_inner_creds.with_quota_project.return_value = mock_new_creds mock_adapter = mock.MagicMock() mock_adapter.get_google_auth_credentials.return_value = mock_inner_creds mock_adapter_instance = mock.MagicMock() mock_adapter_class.return_value = mock_adapter_instance result = auth.with_quota_project(mock_adapter, 'my-billing-project') mock_inner_creds.with_quota_project.assert_called_once_with( 'my-billing-project') # Result should be a new adapter wrapping the new credentials mock_adapter_class.assert_called_once_with(mock_new_creds) self.assertEqual(result, mock_adapter_instance) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
