uranusjr commented on code in PR #26102:
URL: https://github.com/apache/airflow/pull/26102#discussion_r963309290
##########
airflow/providers/google/cloud/hooks/bigquery.py:
##########
@@ -2821,6 +2824,221 @@ def _get_query_result(self) -> Dict:
return query_results
+class BigQueryHookAsync(GoogleBaseHookAsync):
+ """BigQueryHookAsync inherits from GoogleBaseHookAsync class. Interacts
with Google BigQuery"""
+
+ sync_hook_class = BigQueryHook
+
+ async def get_job_instance(
+ self, project_id: Optional[str], job_id: Optional[str], session:
ClientSession
+ ) -> Job:
+ """Get the specified job resource by job ID and project ID."""
+ with await self.service_file_as_context() as f:
+ return Job(job_id=job_id, project=project_id, service_file=f,
session=cast(Session, session))
+
+ async def get_job_status(
+ self,
+ job_id: Optional[str],
+ project_id: Optional[str] = None,
+ ) -> Optional[str]:
+ """
+ Polls for job status asynchronously using gcloud-aio.
+
+ Note that an OSError is raised when Job results are still pending.
+ """
+ async with ClientSession() as s:
+ try:
+ self.log.info("Executing get_job_status...")
+ job_client = await self.get_job_instance(project_id, job_id, s)
+ job_status_response = await job_client.result(cast(Session, s))
+ if job_status_response:
+ job_status = "success"
+ except OSError:
+ job_status = "pending"
+ except Exception as e:
+ self.log.info("Query execution finished with errors...")
+ job_status = str(e)
+ return job_status
+
+ async def get_job_output(
+ self,
+ job_id: Optional[str],
+ project_id: Optional[str] = None,
+ ) -> Dict[str, Any]:
+ """Get the big query job output for the given job id asynchronously
using gcloud-aio."""
+ async with ClientSession() as session:
+ self.log.info("Executing get_job_output..")
+ job_client = await self.get_job_instance(project_id, job_id,
session)
+ job_query_response = await
job_client.get_query_results(cast(Session, session))
+ return job_query_response
+
+ def get_records(self, query_results: Dict[str, Any]) -> List[Any]:
+ """
+ Given the output query response from gcloud aio bigquery, convert the
response to records.
+
+ :param query_results: the results from a SQL query
+ """
+ buffer = []
+ for dict_row in query_results.get("rows", {}):
+ buffer.append([vs["v"] for vs in dict_row["f"]])
+ return buffer
+
+ def value_check(
+ self,
+ sql: str,
+ pass_value: Any,
+ records: List[Any],
+ tolerance: Optional[float] = None,
+ ) -> None:
+ """
+ Match a single query resulting row and tolerance with pass_value
+
+ :return: Raises AirflowException if there is no match.
Review Comment:
```suggestion
:raises: AirflowException if there is no match.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]