MazrimT opened a new issue, #24460:
URL: https://github.com/apache/airflow/issues/24460
### Description
Today the BigQueryGetData
airflow.providers.google.cloud.operators.bigquery.BigQueryGetDataOperator only
allows you to point to a specific dataset and table and how many rows you want.
It already sets up a BigQueryHook so it very easy to implement custom query
from a string as well.
It would also be very efficient to have a as_dict flag to return the result
as a list of dicts.
I am not an expert in python byt here is my atempt at a modification of the
current code (from 8.0.0rc2)
``` python
class BigQueryGetDataOperator(BaseOperator):
"""
Fetches the data from a BigQuery table (alternatively fetch data for
selected columns)
and returns data in a python list. The number of elements in the
returned list will
be equal to the number of rows fetched. Each element in the list will
again be a list
where element would represent the columns values for that row.
**Example Result**: ``[['Tony', '10'], ['Mike', '20'], ['Steve', '15']]``
.. seealso::
For more information on how to use this operator, take a look at the
guide:
:ref:`howto/operator:BigQueryGetDataOperator`
.. note::
If you pass fields to ``selected_fields`` which are in different
order than the
order of columns already in
BQ table, the data will still be in the order of BQ table.
For example if the BQ table has 3 columns as
``[A,B,C]`` and you pass 'B,A' in the ``selected_fields``
the data would still be of the form ``'A,B'``.
**Example**: ::
get_data = BigQueryGetDataOperator(
task_id='get_data_from_bq',
dataset_id='test_dataset',
table_id='Transaction_partitions',
max_results=100,
selected_fields='DATE',
gcp_conn_id='airflow-conn-id'
)
:param dataset_id: The dataset ID of the requested table. (templated)
:param table_id: The table ID of the requested table. (templated)
:param max_results: The maximum number of records (rows) to be fetched
from the table. (templated)
:param selected_fields: List of fields to return (comma-separated). If
unspecified, all fields are returned.
:param gcp_conn_id: (Optional) The connection ID used to connect to
Google Cloud.
:param delegate_to: The account to impersonate using domain-wide
delegation of authority,
if any. For this to work, the service account making the request
must have
domain-wide delegation enabled.
:param location: The location used for the operation.
:param impersonation_chain: Optional service account to impersonate
using short-term
credentials, or chained list of accounts required to get the
access_token
of the last account in the list, which will be impersonated in the
request.
If set as a string, the account must grant the originating account
the Service Account Token Creator IAM role.
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding
identity, with first
account from the list granting this role to the originating account
(templated).
:param query: (Optional) a sql query to execute instead of getting data
from specific dataset and table
:param as_dict: if the result should be returned as a list of
dictionaries. default to False
"""
template_fields: Sequence[str] = (
'dataset_id',
'table_id',
'max_results',
'selected_fields',
'impersonation_chain',
)
ui_color = BigQueryUIColors.QUERY.value
def __init__(
self,
*,
dataset_id: str,
table_id: str,
max_results: int = 100,
selected_fields: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
delegate_to: Optional[str] = None,
location: Optional[str] = None,
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
query: Optional[str] = None,
as_dict: bool = False,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.dataset_id = dataset_id
self.table_id = table_id
self.max_results = int(max_results)
self.selected_fields = selected_fields
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
self.location = location
self.impersonation_chain = impersonation_chain
self.query = query
self.as_dict = as_dict
def execute(self, context: 'Context') -> list:
self.log.info(
'Fetching Data from %s.%s max results: %s', self.dataset_id,
self.table_id, self.max_results
)
hook = BigQueryHook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
impersonation_chain=self.impersonation_chain,
location=self.location,
)
if not self.query:
if not self.selected_fields:
schema: Dict[str, list] = hook.get_schema(
dataset_id=self.dataset_id,
table_id=self.table_id,
)
if "fields" in schema:
self.selected_fields = ','.join([field["name"] for field
in schema["fields"]])
rows = hook.list_rows(
dataset_id=self.dataset_id,
table_id=self.table_id,
max_results=self.max_results,
selected_fields=self.selected_fields
)
if self.as_dict:
table_data = [json.dumps(dict(zip(self.selected_fields,
row))).encode('utf-8') for row in rows]
else:
table_data = [row.values() for row in rows]
else:
with hook.get_conn().cursor().execute(self.query) as cursor:
if self.as_dict:
table_data =
[json.dumps(dict(zip(self.keys,row))).encode('utf-8') for row in
cursor.fetchall()]
else:
table_data = [row for row in cursor.fetchall()]
self.log.info('Total extracted rows: %s', len(table_data))
return table_data
```
### Use case/motivation
This would simplify getting data from BigQuery into airflow instead of
having to first store the data in a separat table with BigQueryInsertJob and
then fetch that.
Also simplifies handling the data with as_dict in the same way that many
other database connectors in python does.
### Related issues
_No response_
### Are you willing to submit a PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]