judoole commented on code in PR #8868:
URL: https://github.com/apache/airflow/pull/8868#discussion_r1222505058


##########
airflow/providers/google/cloud/operators/bigquery.py:
##########
@@ -546,6 +548,11 @@ def __init__(self,
                 "the gcp_conn_id parameter.", DeprecationWarning, stacklevel=3)
             gcp_conn_id = bigquery_conn_id
 
+        warnings.warn(

Review Comment:
   @stiak No problem. I can at least give you the code for it here, so you can 
grab whatever you need. It also contains some extra trickery, like optional 
autolabelling, making params jinja templated and using `.sql` files, as that 
param doesn't work within zip deploys. Anyways, enjoy:
   
   ```python
   from typing import Dict, Any, List, Sequence
   
   import jinja2
   from airflow.providers.google.cloud.operators.bigquery import 
BigQueryInsertJobOperator
   from airflow.utils.decorators import apply_defaults
   from google.cloud.bigquery.table import TableReference
   from airflow.models.taskinstance import Context
   from airflow.utils.file import ZIP_REGEX
   import os
   import io
   import sys
   import zipfile
   from airflow.exceptions import AirflowException
   
   from typing import Optional
   
   
   class BigQueryOperator(BigQueryInsertJobOperator):
       """
           Wrapper for BigQueryInsertJobOperator, contains all the common 
fields used throughout turbineflow.
           Need to update class in order to use more parameters!
   
           Fields are mostly self explanatory except:
           :param autolabels: If true and labels = None, it will automatically 
add sensitive labels, such as dag_id, task_id, and composer name.
           :type autolabels: bool
           :param sql: The SQL to execute.
               If the parameter ends with .sql, it will be read from the file. 
Remember the file path 
               is relative to the DAG file. And if a sql file is used, the sql 
parameter is not templated 
               before it tries to read the file. But, the content of the file 
will be templated.
   
               In order to include sql files which resides in a python module, 
for example in the engine folder,
               you have to write a MANIFEST.in file, and include the sql files 
in the package_data. The MANIFEST.in
               file should contain the following line: recursive-include src 
*.sql
               Replace src with the folder where the sql files resides.
               Also, the setup.py file should contain the following line: 
include_package_data=True inside the setup function.
           :type sql: str
       """
   
       template_fields: Sequence[str] = ("destination_dataset_table", 
"project_id", *BigQueryInsertJobOperator.template_fields)
   
       # We override these, as BigQueryInsertJobOperator is not able
       # to find templates within zips. And we don't want it to try to 
recognize the .sql file ending
       template_ext: Sequence[str] = ()
   
       @apply_defaults
       def __init__(
               self,
               sql: str,
               destination_dataset_table: Optional[str] = None,
               project_id: Optional[str] = None,
               create_disposition: Optional[str] = None,
               write_disposition: Optional[str] = None,
               use_legacy_sql: Optional[bool] = False,
               priority: Optional[str] = None,
               maximum_bytes_billed: Optional[str] = None,
               time_partitioning: Optional[Dict[str, Any]] = None,
               cluster_fields: Optional[List[str]] = None,
               labels: Optional[Dict[str, Any]] = None,
               configuration: Optional[Dict[str, Any]] = None,
               autolabel: Optional[bool] = False,
               *args,
               **kwargs,
       ) -> None:
           if configuration:
               raise ValueError(
                   "We don't allow merging configs, Consider adding parameters 
to class, or use BigQueryInsertJobOperator directly")
   
           self.project_id = project_id
           # Add the sql to the query part
           configuration = {
               "query": {
                   "query": sql
               }
           }
           self.destination_dataset_table = destination_dataset_table
           self.create_disposition = create_disposition
           self.write_disposition = write_disposition
           self.use_legacy_sql = use_legacy_sql
           self.priority = priority
           self.maximum_bytes_billed = maximum_bytes_billed
           self.time_partitioning = time_partitioning
           self.cluster_fields = cluster_fields
           self.labels = labels
           self.sql = sql
   
           # Add other parts
           if self.time_partitioning is not None:
               configuration["query"].update({"timePartitioning": 
self.time_partitioning})
           if self.write_disposition is not None:
               configuration["query"].update({"writeDisposition": 
self.write_disposition})
           if self.create_disposition is not None:
               configuration["query"].update({"createDisposition": 
self.create_disposition})
           if self.use_legacy_sql is not None:
               configuration["query"].update({"useLegacySql": 
self.use_legacy_sql})
           if self.priority is not None:
               configuration["query"].update({"priority": self.priority})
           if self.cluster_fields is not None:
               configuration["query"].update({"clustering": {"fields": 
self.cluster_fields}})
           if self.maximum_bytes_billed is not None:
               configuration["query"].update({"maximumBytesBilled": 
self.maximum_bytes_billed})
           if self.labels is not None:
               configuration["labels"] = self.labels
           if self.labels is None and autolabel:
               configuration["labels"] = {
                   "dag": "{{ dag.dag_id.replace('.', '__') }}",
                   "task": "{{ task.task_id.split('.')[-1] }}",
                   "composer": "{{ conf.get('webserver', 'web_server_name') }}",
               }
   
           super(BigQueryOperator, self).__init__(
               configuration=configuration,
               project_id=project_id,
               *args,
               **kwargs
           )
   
       def render_template_fields(self, context: Context, jinja_env: 
Optional[jinja2.Environment] = None) -> None:
           if self.sql.endswith(".sql"):
               # Since we mostly do our deploying our DAGs in zips, and want to 
be able to read the sql file from within the zip.
               # We need to read the file ourselves, and replace the query with 
the contents of the file.
               # The BigQueryInsertJobOperator is not able to do this, as it 
fails to find the file.
   
               # Get the directory to the DAG in context
               dag_dir = os.path.dirname(context["dag"].full_filepath)
               # Use that to construct the path to the file, relative to the 
DAG file
               file_path = os.path.join(dag_dir, self.sql)
   
               # If the file is within a zip file, load sql from zip
               _, archive, filename = ZIP_REGEX.search(file_path).groups()
               if archive and zipfile.is_zipfile(archive):
                   with io.TextIOWrapper(zipfile.ZipFile(archive, 
mode='r').open(filename)) as f:
                       self.configuration["query"]["query"] = f.read()
               # If the filepath is relative to the DAG dir, load it with 
classic open
               elif os.path.isfile(file_path):
                   with open(file_path, mode='r') as f:
                       self.configuration["query"]["query"] = f.read()
               # If else, try to see if we can find it in the sys.path.
               # Usually, this is the case when the sql file is in a python 
module.
               else:
                   for path in sys.path:
                       if os.path.isfile(os.path.join(path, self.sql)):
                           with open(os.path.join(path, self.sql), mode='r') as 
f:
                               self.configuration["query"]["query"] = f.read()
                           break
                   else:
                       raise AirflowException(f"Could not find file 
{self.sql}.")
   
           # We render the template fields two times. This is necessary in 
order to use destinationTable, as it might be templated.
           # It also allows to use template fields which contain templated 
strings themselves. For example, using the params
           # argument (which is a template argument) while some of the params 
also contain template strings.
           super().render_template_fields(context, jinja_env)
   
           if self.destination_dataset_table:
               self.configuration["query"].update(
                   {
                       "destinationTable": TableReference.from_string(
                           self.destination_dataset_table, 
default_project=self.project_id
                       ).to_api_repr()
                   })
   
           return super().render_template_fields(context, jinja_env)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to