judoole commented on code in PR #8868:
URL: https://github.com/apache/airflow/pull/8868#discussion_r1222505058
##########
airflow/providers/google/cloud/operators/bigquery.py:
##########
@@ -546,6 +548,11 @@ def __init__(self,
"the gcp_conn_id parameter.", DeprecationWarning, stacklevel=3)
gcp_conn_id = bigquery_conn_id
+ warnings.warn(
Review Comment:
@stiak No problem. I can at least give you the code for it here, so you can
grab whatever you need. It also contains some extra trickery, like optional
autolabelling, making params jinja templated and using `.sql` files, as that
param doesn't work within zip deploys. Anyways, enjoy:
```python
from typing import Dict, Any, List, Sequence
import jinja2
from airflow.providers.google.cloud.operators.bigquery import
BigQueryInsertJobOperator
from airflow.utils.decorators import apply_defaults
from google.cloud.bigquery.table import TableReference
from airflow.models.taskinstance import Context
from airflow.utils.file import ZIP_REGEX
import os
import io
import sys
import zipfile
from airflow.exceptions import AirflowException
from typing import Optional
class BigQueryOperator(BigQueryInsertJobOperator):
"""
Wrapper for BigQueryInsertJobOperator, contains all the common
fields used throughout turbineflow.
Need to update class in order to use more parameters!
Fields are mostly self explanatory except:
:param autolabels: If true and labels = None, it will automatically
add sensitive labels, such as dag_id, task_id, and composer name.
:type autolabels: bool
:param sql: The SQL to execute.
If the parameter ends with .sql, it will be read from the file.
Remember the file path
is relative to the DAG file. And if a sql file is used, the sql
parameter is not templated
before it tries to read the file. But, the content of the file
will be templated.
In order to include sql files which resides in a python module,
for example in the engine folder,
you have to write a MANIFEST.in file, and include the sql files
in the package_data. The MANIFEST.in
file should contain the following line: recursive-include src
*.sql
Replace src with the folder where the sql files resides.
Also, the setup.py file should contain the following line:
include_package_data=True inside the setup function.
:type sql: str
"""
template_fields: Sequence[str] = ("destination_dataset_table",
"project_id", *BigQueryInsertJobOperator.template_fields)
# We override these, as BigQueryInsertJobOperator is not able
# to find templates within zips. And we don't want it to try to
recognize the .sql file ending
template_ext: Sequence[str] = ()
@apply_defaults
def __init__(
self,
sql: str,
destination_dataset_table: Optional[str] = None,
project_id: Optional[str] = None,
create_disposition: Optional[str] = None,
write_disposition: Optional[str] = None,
use_legacy_sql: Optional[bool] = False,
priority: Optional[str] = None,
maximum_bytes_billed: Optional[str] = None,
time_partitioning: Optional[Dict[str, Any]] = None,
cluster_fields: Optional[List[str]] = None,
labels: Optional[Dict[str, Any]] = None,
configuration: Optional[Dict[str, Any]] = None,
autolabel: Optional[bool] = False,
*args,
**kwargs,
) -> None:
if configuration:
raise ValueError(
"We don't allow merging configs, Consider adding parameters
to class, or use BigQueryInsertJobOperator directly")
self.project_id = project_id
# Add the sql to the query part
configuration = {
"query": {
"query": sql
}
}
self.destination_dataset_table = destination_dataset_table
self.create_disposition = create_disposition
self.write_disposition = write_disposition
self.use_legacy_sql = use_legacy_sql
self.priority = priority
self.maximum_bytes_billed = maximum_bytes_billed
self.time_partitioning = time_partitioning
self.cluster_fields = cluster_fields
self.labels = labels
self.sql = sql
# Add other parts
if self.time_partitioning is not None:
configuration["query"].update({"timePartitioning":
self.time_partitioning})
if self.write_disposition is not None:
configuration["query"].update({"writeDisposition":
self.write_disposition})
if self.create_disposition is not None:
configuration["query"].update({"createDisposition":
self.create_disposition})
if self.use_legacy_sql is not None:
configuration["query"].update({"useLegacySql":
self.use_legacy_sql})
if self.priority is not None:
configuration["query"].update({"priority": self.priority})
if self.cluster_fields is not None:
configuration["query"].update({"clustering": {"fields":
self.cluster_fields}})
if self.maximum_bytes_billed is not None:
configuration["query"].update({"maximumBytesBilled":
self.maximum_bytes_billed})
if self.labels is not None:
configuration["labels"] = self.labels
if self.labels is None and autolabel:
configuration["labels"] = {
"dag": "{{ dag.dag_id.replace('.', '__') }}",
"task": "{{ task.task_id.split('.')[-1] }}",
"composer": "{{ conf.get('webserver', 'web_server_name') }}",
}
super(BigQueryOperator, self).__init__(
configuration=configuration,
project_id=project_id,
*args,
**kwargs
)
def render_template_fields(self, context: Context, jinja_env:
Optional[jinja2.Environment] = None) -> None:
if self.sql.endswith(".sql"):
# Since we mostly do our deploying our DAGs in zips, and want to
be able to read the sql file from within the zip.
# We need to read the file ourselves, and replace the query with
the contents of the file.
# The BigQueryInsertJobOperator is not able to do this, as it
fails to find the file.
# Get the directory to the DAG in context
dag_dir = os.path.dirname(context["dag"].full_filepath)
# Use that to construct the path to the file, relative to the
DAG file
file_path = os.path.join(dag_dir, self.sql)
# If the file is within a zip file, load sql from zip
_, archive, filename = ZIP_REGEX.search(file_path).groups()
if archive and zipfile.is_zipfile(archive):
with io.TextIOWrapper(zipfile.ZipFile(archive,
mode='r').open(filename)) as f:
self.configuration["query"]["query"] = f.read()
# If the filepath is relative to the DAG dir, load it with
classic open
elif os.path.isfile(file_path):
with open(file_path, mode='r') as f:
self.configuration["query"]["query"] = f.read()
# If else, try to see if we can find it in the sys.path.
# Usually, this is the case when the sql file is in a python
module.
else:
for path in sys.path:
if os.path.isfile(os.path.join(path, self.sql)):
with open(os.path.join(path, self.sql), mode='r') as
f:
self.configuration["query"]["query"] = f.read()
break
else:
raise AirflowException(f"Could not find file
{self.sql}.")
# We render the template fields two times. This is necessary in
order to use destinationTable, as it might be templated.
# It also allows to use template fields which contain templated
strings themselves. For example, using the params
# argument (which is a template argument) while some of the params
also contain template strings.
super().render_template_fields(context, jinja_env)
if self.destination_dataset_table:
self.configuration["query"].update(
{
"destinationTable": TableReference.from_string(
self.destination_dataset_table,
default_project=self.project_id
).to_api_repr()
})
return super().render_template_fields(context, jinja_env)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]