This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4c33f6bcf5 Merge deferrable BigQuery operators to exisitng one (#26433)
4c33f6bcf5 is described below
commit 4c33f6bcf527448283a738ef11478b75ba339422
Author: Ćukasz Wyszomirski <[email protected]>
AuthorDate: Tue Sep 20 15:36:57 2022 +0200
Merge deferrable BigQuery operators to exisitng one (#26433)
---
airflow/providers/google/cloud/hooks/bigquery.py | 10 +-
.../providers/google/cloud/operators/bigquery.py | 769 ++++++++-------------
airflow/providers/google/cloud/sensors/bigquery.py | 2 +-
airflow/providers/google/provider.yaml | 4 +-
.../operators/cloud/bigquery.rst | 22 +-
generated/provider_dependencies.json | 4 +-
.../providers/google/cloud/hooks/test_bigquery.py | 16 +-
.../google/cloud/operators/test_bigquery.py | 99 +--
.../bigquery/example_bigquery_queries_async.py | 130 ++--
9 files changed, 457 insertions(+), 599 deletions(-)
diff --git a/airflow/providers/google/cloud/hooks/bigquery.py
b/airflow/providers/google/cloud/hooks/bigquery.py
index d61634ae4c..fa823af439 100644
--- a/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/airflow/providers/google/cloud/hooks/bigquery.py
@@ -103,8 +103,8 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
)
self.use_legacy_sql = use_legacy_sql
self.location = location
- self.running_job_id = None # type: Optional[str]
- self.api_resource_configs = api_resource_configs if
api_resource_configs else {} # type Dict
+ self.running_job_id: str | None = None
+ self.api_resource_configs: dict = api_resource_configs if
api_resource_configs else {}
self.labels = labels
self.credentials_path = "bigquery_hook_credentials.json"
@@ -2313,14 +2313,14 @@ class BigQueryBaseCursor(LoggingMixin):
self.use_legacy_sql = use_legacy_sql
if api_resource_configs:
_validate_value("api_resource_configs", api_resource_configs, dict)
- self.api_resource_configs = api_resource_configs if
api_resource_configs else {} # type Dict
+ self.api_resource_configs: dict = api_resource_configs if
api_resource_configs else {}
self.running_job_id = None # type: Optional[str]
self.location = location
self.num_retries = num_retries
self.labels = labels
self.hook = hook
- def create_empty_table(self, *args, **kwargs) -> None:
+ def create_empty_table(self, *args, **kwargs):
"""
This method is deprecated.
Please use
`airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.create_empty_table`
@@ -2372,7 +2372,7 @@ class BigQueryBaseCursor(LoggingMixin):
)
return self.hook.delete_dataset(*args, **kwargs)
- def create_external_table(self, *args, **kwargs) -> None:
+ def create_external_table(self, *args, **kwargs):
"""
This method is deprecated.
Please use
`airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.create_external_table`
diff --git a/airflow/providers/google/cloud/operators/bigquery.py
b/airflow/providers/google/cloud/operators/bigquery.py
index 1819909f0b..084205f4ed 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -179,6 +179,7 @@ class BigQueryCheckOperator(_BigQueryDbHookMixin,
SQLCheckOperator):
Service Account Token Creator IAM role to the directly preceding
identity, with first
account from the list granting this role to the originating account
(templated).
:param labels: a dictionary containing labels for the table, passed to
BigQuery
+ :param deferrable: Run operator in the deferrable mode
"""
template_fields: Sequence[str] = (
@@ -199,6 +200,7 @@ class BigQueryCheckOperator(_BigQueryDbHookMixin,
SQLCheckOperator):
location: str | None = None,
impersonation_chain: str | Sequence[str] | None = None,
labels: dict | None = None,
+ deferrable: bool = False,
**kwargs,
) -> None:
super().__init__(sql=sql, **kwargs)
@@ -208,6 +210,59 @@ class BigQueryCheckOperator(_BigQueryDbHookMixin,
SQLCheckOperator):
self.location = location
self.impersonation_chain = impersonation_chain
self.labels = labels
+ self.deferrable = deferrable
+
+ def _submit_job(
+ self,
+ hook: BigQueryHook,
+ job_id: str,
+ ) -> BigQueryJob:
+ """Submit a new job and get the job id for polling the status using
Trigger."""
+ configuration = {"query": {"query": self.sql}}
+
+ return hook.insert_job(
+ configuration=configuration,
+ project_id=hook.project_id,
+ location=self.location,
+ job_id=job_id,
+ nowait=True,
+ )
+
+ def execute(self, context: Context):
+ if not self.deferrable:
+ super().execute(context=context)
+ else:
+ hook = BigQueryHook(
+ gcp_conn_id=self.gcp_conn_id,
+ )
+ job = self._submit_job(hook, job_id="")
+ context["ti"].xcom_push(key="job_id", value=job.job_id)
+ self.defer(
+ timeout=self.execution_timeout,
+ trigger=BigQueryCheckTrigger(
+ conn_id=self.gcp_conn_id,
+ job_id=job.job_id,
+ project_id=hook.project_id,
+ ),
+ method_name="execute_complete",
+ )
+
+ def execute_complete(self, context: Context, event: dict[str, Any]) ->
None:
+ """
+ Callback for when the trigger fires - returns immediately.
+ Relies on trigger to throw an exception, otherwise it assumes
execution was
+ successful.
+ """
+ if event["status"] == "error":
+ raise AirflowException(event["message"])
+
+ records = event["records"]
+ if not records:
+ raise AirflowException("The query returned empty results")
+ elif not all(bool(r) for r in records):
+ raise AirflowException(f"Test
failed.\nQuery:\n{self.sql}\nResults:\n{records!s}")
+ self.log.info("Record: %s", event["records"])
+ self.log.info("Success.")
class BigQueryValueCheckOperator(_BigQueryDbHookMixin, SQLValueCheckOperator):
@@ -233,6 +288,7 @@ class BigQueryValueCheckOperator(_BigQueryDbHookMixin,
SQLValueCheckOperator):
Service Account Token Creator IAM role to the directly preceding
identity, with first
account from the list granting this role to the originating account
(templated).
:param labels: a dictionary containing labels for the table, passed to
BigQuery
+ :param deferrable: Run operator in the deferrable mode
"""
template_fields: Sequence[str] = (
@@ -256,6 +312,7 @@ class BigQueryValueCheckOperator(_BigQueryDbHookMixin,
SQLValueCheckOperator):
location: str | None = None,
impersonation_chain: str | Sequence[str] | None = None,
labels: dict | None = None,
+ deferrable: bool = False,
**kwargs,
) -> None:
super().__init__(sql=sql, pass_value=pass_value, tolerance=tolerance,
**kwargs)
@@ -264,6 +321,65 @@ class BigQueryValueCheckOperator(_BigQueryDbHookMixin,
SQLValueCheckOperator):
self.use_legacy_sql = use_legacy_sql
self.impersonation_chain = impersonation_chain
self.labels = labels
+ self.deferrable = deferrable
+
+ def _submit_job(
+ self,
+ hook: BigQueryHook,
+ job_id: str,
+ ) -> BigQueryJob:
+ """Submit a new job and get the job id for polling the status using
Triggerer."""
+ configuration = {
+ "query": {
+ "query": self.sql,
+ "useLegacySql": False,
+ }
+ }
+ if self.use_legacy_sql:
+ configuration["query"]["useLegacySql"] = self.use_legacy_sql
+
+ return hook.insert_job(
+ configuration=configuration,
+ project_id=hook.project_id,
+ location=self.location,
+ job_id=job_id,
+ nowait=True,
+ )
+
+ def execute(self, context: Context) -> None: # type: ignore[override]
+ if not self.deferrable:
+ super().execute(context=context)
+ else:
+ hook = BigQueryHook(gcp_conn_id=self.gcp_conn_id)
+
+ job = self._submit_job(hook, job_id="")
+ context["ti"].xcom_push(key="job_id", value=job.job_id)
+ self.defer(
+ timeout=self.execution_timeout,
+ trigger=BigQueryValueCheckTrigger(
+ conn_id=self.gcp_conn_id,
+ job_id=job.job_id,
+ project_id=hook.project_id,
+ sql=self.sql,
+ pass_value=self.pass_value,
+ tolerance=self.tol,
+ ),
+ method_name="execute_complete",
+ )
+
+ def execute_complete(self, context: Context, event: dict[str, Any]) ->
None:
+ """
+ Callback for when the trigger fires - returns immediately.
+ Relies on trigger to throw an exception, otherwise it assumes
execution was
+ successful.
+ """
+ if event["status"] == "error":
+ raise AirflowException(event["message"])
+ self.log.info(
+ "%s completed with response %s ",
+ self.task_id,
+ event["message"],
+ )
class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin,
SQLIntervalCheckOperator):
@@ -300,6 +416,7 @@ class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin,
SQLIntervalCheckOperat
Service Account Token Creator IAM role to the directly preceding
identity, with first
account from the list granting this role to the originating account
(templated).
:param labels: a dictionary containing labels for the table, passed to
BigQuery
+ :param deferrable: Run operator in the deferrable mode
"""
template_fields: Sequence[str] = (
@@ -324,6 +441,7 @@ class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin,
SQLIntervalCheckOperat
location: str | None = None,
impersonation_chain: str | Sequence[str] | None = None,
labels: dict | None = None,
+ deferrable: bool = False,
**kwargs,
) -> None:
super().__init__(
@@ -339,6 +457,67 @@ class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin,
SQLIntervalCheckOperat
self.location = location
self.impersonation_chain = impersonation_chain
self.labels = labels
+ self.deferrable = deferrable
+
+ def _submit_job(
+ self,
+ hook: BigQueryHook,
+ sql: str,
+ job_id: str,
+ ) -> BigQueryJob:
+ """Submit a new job and get the job id for polling the status using
Triggerer."""
+ configuration = {"query": {"query": sql}}
+ return hook.insert_job(
+ configuration=configuration,
+ project_id=hook.project_id,
+ location=self.location,
+ job_id=job_id,
+ nowait=True,
+ )
+
+ def execute(self, context: Context):
+ if not self.deferrable:
+ super().execute(context)
+ else:
+ hook = BigQueryHook(gcp_conn_id=self.gcp_conn_id)
+ self.log.info("Using ratio formula: %s", self.ratio_formula)
+
+ self.log.info("Executing SQL check: %s", self.sql1)
+ job_1 = self._submit_job(hook, sql=self.sql1, job_id="")
+ context["ti"].xcom_push(key="job_id", value=job_1.job_id)
+
+ self.log.info("Executing SQL check: %s", self.sql2)
+ job_2 = self._submit_job(hook, sql=self.sql2, job_id="")
+ self.defer(
+ timeout=self.execution_timeout,
+ trigger=BigQueryIntervalCheckTrigger(
+ conn_id=self.gcp_conn_id,
+ first_job_id=job_1.job_id,
+ second_job_id=job_2.job_id,
+ project_id=hook.project_id,
+ table=self.table,
+ metrics_thresholds=self.metrics_thresholds,
+ date_filter_column=self.date_filter_column,
+ days_back=self.days_back,
+ ratio_formula=self.ratio_formula,
+ ignore_zero=self.ignore_zero,
+ ),
+ method_name="execute_complete",
+ )
+
+ def execute_complete(self, context: Context, event: dict[str, Any]) ->
None:
+ """
+ Callback for when the trigger fires - returns immediately.
+ Relies on trigger to throw an exception, otherwise it assumes
execution was
+ successful.
+ """
+ if event["status"] == "error":
+ raise AirflowException(event["message"])
+ self.log.info(
+ "%s completed with response %s ",
+ self.task_id,
+ event["message"],
+ )
class BigQueryGetDataOperator(BaseOperator):
@@ -395,6 +574,7 @@ class BigQueryGetDataOperator(BaseOperator):
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding
identity, with first
account from the list granting this role to the originating account
(templated).
+ :param deferrable: Run operator in the deferrable mode
"""
template_fields: Sequence[str] = (
@@ -419,6 +599,7 @@ class BigQueryGetDataOperator(BaseOperator):
delegate_to: str | None = None,
location: str | None = None,
impersonation_chain: str | Sequence[str] | None = None,
+ deferrable: bool = False,
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -432,39 +613,97 @@ class BigQueryGetDataOperator(BaseOperator):
self.location = location
self.impersonation_chain = impersonation_chain
self.project_id = project_id
+ self.deferrable = deferrable
- def execute(self, context: Context) -> list:
- self.log.info(
- 'Fetching Data from %s.%s max results: %s', self.dataset_id,
self.table_id, self.max_results
+ def _submit_job(
+ self,
+ hook: BigQueryHook,
+ job_id: str,
+ ) -> BigQueryJob:
+ get_query = self.generate_query()
+ configuration = {"query": {"query": get_query}}
+ """Submit a new job and get the job id for polling the status using
Triggerer."""
+ return hook.insert_job(
+ configuration=configuration,
+ location=self.location,
+ project_id=hook.project_id,
+ job_id=job_id,
+ nowait=True,
)
+ def generate_query(self) -> str:
+ """
+ Generate a select query if selected fields are given or with *
+ for the given dataset and table id
+ """
+ query = "select "
+ if self.selected_fields:
+ query += self.selected_fields
+ else:
+ query += "*"
+ query += f" from {self.dataset_id}.{self.table_id} limit
{self.max_results}"
+ return query
+
+ def execute(self, context: Context):
hook = BigQueryHook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
impersonation_chain=self.impersonation_chain,
)
+ self.hook = hook
+
+ if not self.deferrable:
+ self.log.info(
+ 'Fetching Data from %s.%s max results: %s', self.dataset_id,
self.table_id, self.max_results
+ )
+ if not self.selected_fields:
+ schema: dict[str, list] = hook.get_schema(
+ dataset_id=self.dataset_id,
+ table_id=self.table_id,
+ )
+ if "fields" in schema:
+ self.selected_fields = ','.join([field["name"] for field
in schema["fields"]])
- if not self.selected_fields:
- schema: dict[str, list] = hook.get_schema(
+ rows = hook.list_rows(
dataset_id=self.dataset_id,
table_id=self.table_id,
+ max_results=self.max_results,
+ selected_fields=self.selected_fields,
+ location=self.location,
+ project_id=self.project_id,
)
- if "fields" in schema:
- self.selected_fields = ','.join([field["name"] for field in
schema["fields"]])
- rows = hook.list_rows(
- dataset_id=self.dataset_id,
- table_id=self.table_id,
- max_results=self.max_results,
- selected_fields=self.selected_fields,
- location=self.location,
- project_id=self.project_id,
+ self.log.info('Total extracted rows: %s', len(rows))
+
+ table_data = [row.values() for row in rows]
+ return table_data
+
+ job = self._submit_job(hook, job_id="")
+ self.job_id = job.job_id
+ context["ti"].xcom_push(key="job_id", value=self.job_id)
+ self.defer(
+ timeout=self.execution_timeout,
+ trigger=BigQueryGetDataTrigger(
+ conn_id=self.gcp_conn_id,
+ job_id=self.job_id,
+ dataset_id=self.dataset_id,
+ table_id=self.table_id,
+ project_id=hook.project_id,
+ ),
+ method_name="execute_complete",
)
- self.log.info('Total extracted rows: %s', len(rows))
+ def execute_complete(self, context: Context, event: dict[str, Any]) -> Any:
+ """
+ Callback for when the trigger fires - returns immediately.
+ Relies on trigger to throw an exception, otherwise it assumes
execution was
+ successful.
+ """
+ if event["status"] == "error":
+ raise AirflowException(event["message"])
- table_data = [row.values() for row in rows]
- return table_data
+ self.log.info("Total extracted rows: %s", len(event["records"]))
+ return event["records"]
class BigQueryExecuteQueryOperator(BaseOperator):
@@ -2099,6 +2338,7 @@ class BigQueryInsertJobOperator(BaseOperator):
:param cancel_on_kill: Flag which indicates whether cancel the hook's job
or not, when on_kill is called
:param result_retry: How to retry the `result` call that retrieves rows
:param result_timeout: The number of seconds to wait for `result` method
before using `result_retry`
+ :param deferrable: Run operator in the deferrable mode
"""
template_fields: Sequence[str] = (
@@ -2129,6 +2369,7 @@ class BigQueryInsertJobOperator(BaseOperator):
cancel_on_kill: bool = True,
result_retry: Retry = DEFAULT_RETRY,
result_timeout: float | None = None,
+ deferrable: bool = False,
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -2145,6 +2386,7 @@ class BigQueryInsertJobOperator(BaseOperator):
self.result_retry = result_retry
self.result_timeout = result_timeout
self.hook: BigQueryHook | None = None
+ self.deferrable = deferrable
def prepare_template(self) -> None:
# If .json is passed then we have to read the file
@@ -2200,7 +2442,11 @@ class BigQueryInsertJobOperator(BaseOperator):
location=self.location,
job_id=job_id,
)
- if job.state not in self.reattach_states:
+ if job.state in self.reattach_states:
+ # We are reattaching to a job
+ job._begin()
+ self._handle_job_error(job)
+ else:
# Same job configuration so we need force_rerun
raise AirflowException(
f"Job with id: {job_id} already exists and is in
{job.state} state. If you "
@@ -2235,10 +2481,36 @@ class BigQueryInsertJobOperator(BaseOperator):
BigQueryTableLink.persist(**persist_kwargs)
self.job_id = job.job_id
+ context["ti"].xcom_push(key="job_id", value=self.job_id)
# Wait for the job to complete
- job.result(timeout=self.result_timeout, retry=self.result_retry)
- self._handle_job_error(job)
+ if not self.deferrable:
+ job.result(timeout=self.result_timeout, retry=self.result_retry)
+ self._handle_job_error(job)
+
+ return self.job_id
+ self.defer(
+ timeout=self.execution_timeout,
+ trigger=BigQueryInsertJobTrigger(
+ conn_id=self.gcp_conn_id,
+ job_id=self.job_id,
+ project_id=self.project_id,
+ ),
+ method_name="execute_complete",
+ )
+ def execute_complete(self, context: Context, event: dict[str, Any]):
+ """
+ Callback for when the trigger fires - returns immediately.
+ Relies on trigger to throw an exception, otherwise it assumes
execution was
+ successful.
+ """
+ if event["status"] == "error":
+ raise AirflowException(event["message"])
+ self.log.info(
+ "%s completed with response %s ",
+ self.task_id,
+ event["message"],
+ )
return self.job_id
def on_kill(self) -> None:
@@ -2248,460 +2520,3 @@ class BigQueryInsertJobOperator(BaseOperator):
)
else:
self.log.info('Skipping to cancel job: %s:%s.%s', self.project_id,
self.location, self.job_id)
-
-
-class BigQueryInsertJobAsyncOperator(BigQueryInsertJobOperator, BaseOperator):
- """
- Starts a BigQuery job asynchronously, and returns job id.
- This operator works in the following way:
-
- - it calculates a unique hash of the job using job's configuration or uuid
if ``force_rerun`` is True
- - creates ``job_id`` in form of
- ``[provided_job_id |
airflow_{dag_id}_{task_id}_{exec_date}]_{uniqueness_suffix}``
- - submits a BigQuery job using the ``job_id``
- - if job with given id already exists then it tries to reattach to the job
if its not done and its
- state is in ``reattach_states``. If the job is done the operator will
raise ``AirflowException``.
-
- Using ``force_rerun`` will submit a new job every time without attaching
to already existing ones.
-
- For job definition see here:
-
- https://cloud.google.com/bigquery/docs/reference/v2/jobs
-
- :param configuration: The configuration parameter maps directly to
BigQuery's
- configuration field in the job object. For more details see
- https://cloud.google.com/bigquery/docs/reference/v2/jobs
- :param job_id: The ID of the job. It will be suffixed with hash of job
configuration
- unless ``force_rerun`` is True.
- The ID must contain only letters (a-z, A-Z), numbers (0-9),
underscores (_), or
- dashes (-). The maximum length is 1,024 characters. If not provided
then uuid will
- be generated.
- :param force_rerun: If True then operator will use hash of uuid as job id
suffix
- :param reattach_states: Set of BigQuery job's states in case of which we
should reattach
- to the job. Should be other than final states.
- :param project_id: Google Cloud Project where the job is running
- :param location: location the job is running
- :param gcp_conn_id: The connection ID used to connect to Google Cloud.
- :param delegate_to: The account to impersonate using domain-wide
delegation of authority,
- if any. For this to work, the service account making the request must
have
- domain-wide delegation enabled.
- :param impersonation_chain: Optional service account to impersonate using
short-term
- credentials, or chained list of accounts required to get the
access_token
- of the last account in the list, which will be impersonated in the
request.
- If set as a string, the account must grant the originating account
- the Service Account Token Creator IAM role.
- If set as a sequence, the identities from the list must grant
- Service Account Token Creator IAM role to the directly preceding
identity, with first
- account from the list granting this role to the originating account
(templated).
- :param cancel_on_kill: Flag which indicates whether cancel the hook's job
or not, when on_kill is called
- """
-
- def _submit_job(self, hook: BigQueryHook, job_id: str) -> BigQueryJob: #
type: ignore[override]
- """Submit a new job and get the job id for polling the status using
Triggerer."""
- return hook.insert_job(
- configuration=self.configuration,
- project_id=self.project_id,
- location=self.location,
- job_id=job_id,
- nowait=True,
- )
-
- def execute(self, context: Any) -> None:
- hook = BigQueryHook(gcp_conn_id=self.gcp_conn_id)
-
- self.hook = hook
- job_id = self.hook.generate_job_id(
- job_id=self.job_id,
- dag_id=self.dag_id,
- task_id=self.task_id,
- logical_date=context["logical_date"],
- configuration=self.configuration,
- force_rerun=self.force_rerun,
- )
-
- try:
- job = self._submit_job(hook, job_id)
- self._handle_job_error(job)
- except Conflict:
- # If the job already exists retrieve it
- job = hook.get_job(
- project_id=self.project_id,
- location=self.location,
- job_id=job_id,
- )
- if job.state in self.reattach_states:
- # We are reattaching to a job
- job._begin()
- self._handle_job_error(job)
- else:
- # Same job configuration so we need force_rerun
- raise AirflowException(
- f"Job with id: {job_id} already exists and is in
{job.state} state. If you "
- f"want to force rerun it consider setting
`force_rerun=True`."
- f"Or, if you want to reattach in this scenario add
{job.state} to `reattach_states`"
- )
-
- self.job_id = job.job_id
- context["ti"].xcom_push(key="job_id", value=self.job_id)
- self.defer(
- timeout=self.execution_timeout,
- trigger=BigQueryInsertJobTrigger(
- conn_id=self.gcp_conn_id,
- job_id=self.job_id,
- project_id=self.project_id,
- ),
- method_name="execute_complete",
- )
-
- def execute_complete(self, context: Any, event: dict[str, Any]) -> None:
- """
- Callback for when the trigger fires - returns immediately.
- Relies on trigger to throw an exception, otherwise it assumes
execution was
- successful.
- """
- if event["status"] == "error":
- raise AirflowException(event["message"])
- self.log.info(
- "%s completed with response %s ",
- self.task_id,
- event["message"],
- )
-
-
-class BigQueryCheckAsyncOperator(BigQueryCheckOperator):
- """
- BigQueryCheckAsyncOperator is asynchronous operator, submit the job and
check
- for the status in async mode by using the job id
- """
-
- def _submit_job(
- self,
- hook: BigQueryHook,
- job_id: str,
- ) -> BigQueryJob:
- """Submit a new job and get the job id for polling the status using
Trigger."""
- configuration = {"query": {"query": self.sql}}
-
- return hook.insert_job(
- configuration=configuration,
- project_id=hook.project_id,
- location=self.location,
- job_id=job_id,
- nowait=True,
- )
-
- def execute(self, context: Any) -> None:
- hook = BigQueryHook(
- gcp_conn_id=self.gcp_conn_id,
- )
- job = self._submit_job(hook, job_id="")
- context["ti"].xcom_push(key="job_id", value=job.job_id)
- self.defer(
- timeout=self.execution_timeout,
- trigger=BigQueryCheckTrigger(
- conn_id=self.gcp_conn_id,
- job_id=job.job_id,
- project_id=hook.project_id,
- ),
- method_name="execute_complete",
- )
-
- def execute_complete(self, context: Any, event: dict[str, Any]) -> None:
- """
- Callback for when the trigger fires - returns immediately.
- Relies on trigger to throw an exception, otherwise it assumes
execution was
- successful.
- """
- if event["status"] == "error":
- raise AirflowException(event["message"])
-
- records = event["records"]
- if not records:
- raise AirflowException("The query returned None")
- elif not all(bool(r) for r in records):
- raise AirflowException(f"Test
failed.\nQuery:\n{self.sql}\nResults:\n{records!s}")
- self.log.info("Record: %s", event["records"])
- self.log.info("Success.")
-
-
-class BigQueryGetDataAsyncOperator(BigQueryGetDataOperator):
- """
- Fetches the data from a BigQuery table (alternatively fetch data for
selected columns)
- and returns data in a python list. The number of elements in the returned
list will
- be equal to the number of rows fetched. Each element in the list will
again be a list
- where element would represent the columns values for that row.
-
- **Example Result**: ``[['Tony', '10'], ['Mike', '20'], ['Steve', '15']]``
-
- .. note::
- If you pass fields to ``selected_fields`` which are in different order
than the
- order of columns already in
- BQ table, the data will still be in the order of BQ table.
- For example if the BQ table has 3 columns as
- ``[A,B,C]`` and you pass 'B,A' in the ``selected_fields``
- the data would still be of the form ``'A,B'``.
-
- **Example**: ::
-
- get_data = BigQueryGetDataOperator(
- task_id='get_data_from_bq',
- dataset_id='test_dataset',
- table_id='Transaction_partitions',
- max_results=100,
- selected_fields='DATE',
- gcp_conn_id='airflow-conn-id'
- )
-
- :param dataset_id: The dataset ID of the requested table. (templated)
- :param table_id: The table ID of the requested table. (templated)
- :param max_results: The maximum number of records (rows) to be fetched
from the table. (templated)
- :param selected_fields: List of fields to return (comma-separated). If
- unspecified, all fields are returned.
- :param gcp_conn_id: (Optional) The connection ID used to connect to Google
Cloud.
- :param delegate_to: The account to impersonate using domain-wide
delegation of authority,
- if any. For this to work, the service account making the request must
have
- domain-wide delegation enabled.
- :param location: The location used for the operation.
- :param impersonation_chain: Optional service account to impersonate using
short-term
- credentials, or chained list of accounts required to get the
access_token
- of the last account in the list, which will be impersonated in the
request.
- If set as a string, the account must grant the originating account
- the Service Account Token Creator IAM role.
- If set as a sequence, the identities from the list must grant
- Service Account Token Creator IAM role to the directly preceding
identity, with first
- account from the list granting this role to the originating account
(templated).
- """
-
- def _submit_job(
- self,
- hook: BigQueryHook,
- job_id: str,
- configuration: dict[str, Any],
- ) -> BigQueryJob:
- """Submit a new job and get the job id for polling the status using
Triggerer."""
- return hook.insert_job(
- configuration=configuration,
- location=self.location,
- project_id=hook.project_id,
- job_id=job_id,
- nowait=True,
- )
-
- def generate_query(self) -> str:
- """
- Generate a select query if selected fields are given or with *
- for the given dataset and table id
- """
- selected_fields = self.selected_fields if self.selected_fields else "*"
- return f"select {selected_fields} from
{self.dataset_id}.{self.table_id} limit {self.max_results}"
-
- def execute(self, context: Any) -> None: # type: ignore[override]
- get_query = self.generate_query()
- configuration = {"query": {"query": get_query}}
-
- hook = BigQueryHook(
- gcp_conn_id=self.gcp_conn_id,
- delegate_to=self.delegate_to,
- location=self.location,
- impersonation_chain=self.impersonation_chain,
- )
-
- self.hook = hook
- job = self._submit_job(hook, job_id="", configuration=configuration)
- self.job_id = job.job_id
- context["ti"].xcom_push(key="job_id", value=self.job_id)
- self.defer(
- timeout=self.execution_timeout,
- trigger=BigQueryGetDataTrigger(
- conn_id=self.gcp_conn_id,
- job_id=self.job_id,
- dataset_id=self.dataset_id,
- table_id=self.table_id,
- project_id=hook.project_id,
- ),
- method_name="execute_complete",
- )
-
- def execute_complete(self, context: Any, event: dict[str, Any]) -> Any:
- """
- Callback for when the trigger fires - returns immediately.
- Relies on trigger to throw an exception, otherwise it assumes
execution was
- successful.
- """
- if event["status"] == "error":
- raise AirflowException(event["message"])
-
- self.log.info("Total extracted rows: %s", len(event["records"]))
- return event["records"]
-
-
-class BigQueryIntervalCheckAsyncOperator(BigQueryIntervalCheckOperator):
- """
- Checks asynchronously that the values of metrics given as SQL expressions
are within
- a certain tolerance of the ones from days_back before.
-
- This method constructs a query like so ::
- SELECT {metrics_threshold_dict_key} FROM {table}
- WHERE {date_filter_column}=<date>
-
- :param table: the table name
- :param days_back: number of days between ds and the ds we want to check
- against. Defaults to 7 days
- :param metrics_thresholds: a dictionary of ratios indexed by metrics, for
- example 'COUNT(*)': 1.5 would require a 50 percent or less difference
- between the current day, and the prior days_back.
- :param use_legacy_sql: Whether to use legacy SQL (true)
- or standard SQL (false).
- :param gcp_conn_id: (Optional) The connection ID used to connect to Google
Cloud.
- :param location: The geographic location of the job. See details at:
-
https://cloud.google.com/bigquery/docs/locations#specifying_your_location
- :param impersonation_chain: Optional service account to impersonate using
short-term
- credentials, or chained list of accounts required to get the
access_token
- of the last account in the list, which will be impersonated in the
request.
- If set as a string, the account must grant the originating account
- the Service Account Token Creator IAM role.
- If set as a sequence, the identities from the list must grant
- Service Account Token Creator IAM role to the directly preceding
identity, with first
- account from the list granting this role to the originating account
(templated).
- :param labels: a dictionary containing labels for the table, passed to
BigQuery
- """
-
- def _submit_job(
- self,
- hook: BigQueryHook,
- sql: str,
- job_id: str,
- ) -> BigQueryJob:
- """Submit a new job and get the job id for polling the status using
Triggerer."""
- configuration = {"query": {"query": sql}}
- return hook.insert_job(
- configuration=configuration,
- project_id=hook.project_id,
- location=self.location,
- job_id=job_id,
- nowait=True,
- )
-
- def execute(self, context: Any) -> None:
- """Execute the job in sync mode and defers the trigger with job id to
poll for the status"""
- hook = BigQueryHook(gcp_conn_id=self.gcp_conn_id)
- self.log.info("Using ratio formula: %s", self.ratio_formula)
-
- self.log.info("Executing SQL check: %s", self.sql1)
- job_1 = self._submit_job(hook, sql=self.sql1, job_id="")
- context["ti"].xcom_push(key="job_id", value=job_1.job_id)
-
- self.log.info("Executing SQL check: %s", self.sql2)
- job_2 = self._submit_job(hook, sql=self.sql2, job_id="")
- self.defer(
- timeout=self.execution_timeout,
- trigger=BigQueryIntervalCheckTrigger(
- conn_id=self.gcp_conn_id,
- first_job_id=job_1.job_id,
- second_job_id=job_2.job_id,
- project_id=hook.project_id,
- table=self.table,
- metrics_thresholds=self.metrics_thresholds,
- date_filter_column=self.date_filter_column,
- days_back=self.days_back,
- ratio_formula=self.ratio_formula,
- ignore_zero=self.ignore_zero,
- ),
- method_name="execute_complete",
- )
-
- def execute_complete(self, context: Any, event: dict[str, Any]) -> None:
- """
- Callback for when the trigger fires - returns immediately.
- Relies on trigger to throw an exception, otherwise it assumes
execution was
- successful.
- """
- if event["status"] == "error":
- raise AirflowException(event["message"])
-
- self.log.info(
- "%s completed with response %s ",
- self.task_id,
- event["status"],
- )
-
-
-class BigQueryValueCheckAsyncOperator(BigQueryValueCheckOperator):
- """
- Performs a simple value check using sql code.
-
- .. seealso::
- For more information on how to use this operator, take a look at the
guide:
- :ref:`howto/operator:BigQueryValueCheckOperator`
-
- :param sql: the sql to be executed
- :param use_legacy_sql: Whether to use legacy SQL (true)
- or standard SQL (false).
- :param gcp_conn_id: (Optional) The connection ID used to connect to Google
Cloud.
- :param location: The geographic location of the job. See details at:
-
https://cloud.google.com/bigquery/docs/locations#specifying_your_location
- :param impersonation_chain: Optional service account to impersonate using
short-term
- credentials, or chained list of accounts required to get the
access_token
- of the last account in the list, which will be impersonated in the
request.
- If set as a string, the account must grant the originating account
- the Service Account Token Creator IAM role.
- If set as a sequence, the identities from the list must grant
- Service Account Token Creator IAM role to the directly preceding
identity, with first
- account from the list granting this role to the originating account
(templated).
- :param labels: a dictionary containing labels for the table, passed to
BigQuery
- """
-
- def _submit_job(
- self,
- hook: BigQueryHook,
- job_id: str,
- ) -> BigQueryJob:
- """Submit a new job and get the job id for polling the status using
Triggerer."""
- configuration = {
- "query": {
- "query": self.sql,
- "useLegacySql": False,
- }
- }
- if self.use_legacy_sql:
- configuration["query"]["useLegacySql"] = self.use_legacy_sql
-
- return hook.insert_job(
- configuration=configuration,
- project_id=hook.project_id,
- location=self.location,
- job_id=job_id,
- nowait=True,
- )
-
- def execute(self, context: Any) -> None:
- hook = BigQueryHook(gcp_conn_id=self.gcp_conn_id)
-
- job = self._submit_job(hook, job_id="")
- context["ti"].xcom_push(key="job_id", value=job.job_id)
- self.defer(
- timeout=self.execution_timeout,
- trigger=BigQueryValueCheckTrigger(
- conn_id=self.gcp_conn_id,
- job_id=job.job_id,
- project_id=hook.project_id,
- sql=self.sql,
- pass_value=self.pass_value,
- tolerance=self.tol,
- ),
- method_name="execute_complete",
- )
-
- def execute_complete(self, context: Any, event: dict[str, Any]) -> None:
- """
- Callback for when the trigger fires - returns immediately.
- Relies on trigger to throw an exception, otherwise it assumes
execution was
- successful.
- """
- if event["status"] == "error":
- raise AirflowException(event["message"])
- self.log.info(
- "%s completed with response %s ",
- self.task_id,
- event["message"],
- )
diff --git a/airflow/providers/google/cloud/sensors/bigquery.py
b/airflow/providers/google/cloud/sensors/bigquery.py
index 598b556d7e..e18145f08c 100644
--- a/airflow/providers/google/cloud/sensors/bigquery.py
+++ b/airflow/providers/google/cloud/sensors/bigquery.py
@@ -15,7 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""This module contains a Google Bigquery sensor."""
+"""This module contains Google BigQuery sensors."""
from __future__ import annotations
from datetime import timedelta
diff --git a/airflow/providers/google/provider.yaml
b/airflow/providers/google/provider.yaml
index b9f653da28..b877fd5660 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -64,8 +64,8 @@ dependencies:
# Introduced breaking changes across the board. Those libraries should be
upgraded soon
# TODO: Upgrade all Google libraries that are limited to <2.0.0
- PyOpenSSL
- - asgiref
- - gcloud-aio-bigquery
+ - asgiref>=3.5.2
+ - gcloud-aio-bigquery>=6.1.2
- gcloud-aio-storage
- google-ads>=15.1.1
- google-api-core>=2.7.0,<3.0.0
diff --git a/docs/apache-airflow-providers-google/operators/cloud/bigquery.rst
b/docs/apache-airflow-providers-google/operators/cloud/bigquery.rst
index 6672d2ce58..548c37ace2 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/bigquery.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/bigquery.rst
@@ -1,3 +1,4 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
@@ -348,6 +349,14 @@ idempotency. If this parameter is not passed then uuid
will be used as ``job_id`
operator will try to submit a new job with this ``job_id```. If there's
already a job with such ``job_id``
then it will reattach to the existing job.
+Also for all this action you can use operator in the deferrable mode:
+
+.. exampleinclude::
/../../tests/system/providers/google/cloud/bigquery/example_bigquery_queries_async.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_bigquery_insert_job_async]
+ :end-before: [END howto_operator_bigquery_insert_job_async]
+
Validate data
^^^^^^^^^^^^^
@@ -370,8 +379,7 @@ return ``False`` the check is failed and errors out.
:start-after: [START howto_operator_bigquery_check]
:end-before: [END howto_operator_bigquery_check]
-Below example shows the usage of
:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryCheckAsyncOperator`,
-which is the deferrable version of the operator
+Also you can use deferrable mode in this operator
.. exampleinclude::
/../../tests/system/providers/google/cloud/bigquery/example_bigquery_queries_async.py
:language: python
@@ -398,10 +406,7 @@ or numeric value. If numeric, you can also specify
``tolerance``.
:start-after: [START howto_operator_bigquery_value_check]
:end-before: [END howto_operator_bigquery_value_check]
-The below example shows how to use
-:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryValueCheckAsyncOperator`.
-Note that this is a deferrable operator which requires the Triggerer to be
running on your Airflow
-deployment.
+Also you can use deferrable mode in this operator
.. exampleinclude::
/../../tests/system/providers/google/cloud/bigquery/example_bigquery_queries_async.py
:language: python
@@ -425,10 +430,7 @@ tolerance of the ones from ``days_back`` before you can
either use
:start-after: [START howto_operator_bigquery_interval_check]
:end-before: [END howto_operator_bigquery_interval_check]
-The below example shows how to use
-:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryIntervalCheckAsyncOperator`.
-Note that this is a deferrable operator which requires the Triggerer to be
running on your Airflow
-deployment.
+Also you can use deferrable mode in this operator
.. exampleinclude::
/../../tests/system/providers/google/cloud/bigquery/example_bigquery_queries_async.py
:language: python
diff --git a/generated/provider_dependencies.json
b/generated/provider_dependencies.json
index bc266ec27a..bfbb0a463d 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -301,8 +301,8 @@
"PyOpenSSL",
"apache-airflow-providers-common-sql>=1.1.0",
"apache-airflow>=2.2.0",
- "asgiref",
- "gcloud-aio-bigquery",
+ "asgiref>=3.5.2",
+ "gcloud-aio-bigquery>=6.1.2",
"gcloud-aio-storage",
"google-ads>=15.1.1",
"google-api-core>=2.7.0,<3.0.0",
diff --git a/tests/providers/google/cloud/hooks/test_bigquery.py
b/tests/providers/google/cloud/hooks/test_bigquery.py
index 25888ee961..ae642c3583 100644
--- a/tests/providers/google/cloud/hooks/test_bigquery.py
+++ b/tests/providers/google/cloud/hooks/test_bigquery.py
@@ -55,13 +55,13 @@ CREDENTIALS = "bq-credentials"
DATASET_ID = "bq_dataset"
TABLE_ID = "bq_table"
PARTITION_ID = "20200101"
-VIEW_ID = 'bq_view'
+VIEW_ID = "bq_view"
JOB_ID = "1234"
-LOCATION = 'europe-north1'
+LOCATION = "europe-north1"
TABLE_REFERENCE_REPR = {
- 'tableId': TABLE_ID,
- 'datasetId': DATASET_ID,
- 'projectId': PROJECT_ID,
+ "tableId": TABLE_ID,
+ "datasetId": DATASET_ID,
+ "projectId": PROJECT_ID,
}
TABLE_REFERENCE = TableReference.from_api_repr(TABLE_REFERENCE_REPR)
@@ -890,7 +890,7 @@ class TestBigQueryHookMethods(_BigQueryBaseTestClass):
_, kwargs = mock_insert.call_args
assert kwargs["configuration"]['labels'] == {'label1': 'test1',
'label2': 'test2'}
- @pytest.mark.parametrize('nowait', [True, False])
+ @pytest.mark.parametrize("nowait", [True, False])
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.QueryJob")
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_client")
def test_insert_job(self, mock_client, mock_query_job, nowait):
@@ -913,8 +913,8 @@ class TestBigQueryHookMethods(_BigQueryBaseTestClass):
mock_query_job.from_api_repr.assert_called_once_with(
{
- 'configuration': job_conf,
- 'jobReference': {'jobId': JOB_ID, 'projectId': PROJECT_ID,
'location': LOCATION},
+ "configuration": job_conf,
+ "jobReference": {"jobId": JOB_ID, "projectId": PROJECT_ID,
"location": LOCATION},
},
mock_client.return_value,
)
diff --git a/tests/providers/google/cloud/operators/test_bigquery.py
b/tests/providers/google/cloud/operators/test_bigquery.py
index 57c7915619..26159620cd 100644
--- a/tests/providers/google/cloud/operators/test_bigquery.py
+++ b/tests/providers/google/cloud/operators/test_bigquery.py
@@ -30,7 +30,6 @@ from airflow.models import DAG
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
from airflow.providers.google.cloud.operators.bigquery import (
- BigQueryCheckAsyncOperator,
BigQueryCheckOperator,
BigQueryConsoleIndexableLink,
BigQueryConsoleLink,
@@ -40,20 +39,16 @@ from airflow.providers.google.cloud.operators.bigquery
import (
BigQueryDeleteDatasetOperator,
BigQueryDeleteTableOperator,
BigQueryExecuteQueryOperator,
- BigQueryGetDataAsyncOperator,
BigQueryGetDataOperator,
BigQueryGetDatasetOperator,
BigQueryGetDatasetTablesOperator,
- BigQueryInsertJobAsyncOperator,
BigQueryInsertJobOperator,
- BigQueryIntervalCheckAsyncOperator,
BigQueryIntervalCheckOperator,
BigQueryPatchDatasetOperator,
BigQueryUpdateDatasetOperator,
BigQueryUpdateTableOperator,
BigQueryUpdateTableSchemaOperator,
BigQueryUpsertTableOperator,
- BigQueryValueCheckAsyncOperator,
BigQueryValueCheckOperator,
)
from airflow.providers.google.cloud.triggers.bigquery import (
@@ -69,9 +64,9 @@ from airflow.utils.types import DagRunType
from tests.test_utils.db import clear_db_dags, clear_db_runs,
clear_db_serialized_dags, clear_db_xcom
TASK_ID = 'test-bq-generic-operator'
-TEST_DATASET = 'test-dataset'
-TEST_DATASET_LOCATION = 'EU'
-TEST_GCP_PROJECT_ID = 'test-project'
+TEST_DATASET = "test-dataset"
+TEST_DATASET_LOCATION = "EU"
+TEST_GCP_PROJECT_ID = "test-project"
TEST_DELETE_CONTENTS = True
TEST_TABLE_ID = 'test-table-id'
TEST_GCS_BUCKET = 'test-bucket'
@@ -1142,12 +1137,13 @@ def test_bigquery_insert_job_operator_async(mock_hook):
}
mock_hook.return_value.insert_job.return_value =
MagicMock(job_id=real_job_id, error_result=False)
- op = BigQueryInsertJobAsyncOperator(
+ op = BigQueryInsertJobOperator(
task_id="insert_query_job",
configuration=configuration,
location=TEST_DATASET_LOCATION,
job_id=job_id,
project_id=TEST_GCP_PROJECT_ID,
+ deferrable=True,
)
with pytest.raises(TaskDeferred) as exc:
@@ -1168,12 +1164,13 @@ def test_bigquery_insert_job_operator_execute_failure():
}
job_id = "123456"
- operator = BigQueryInsertJobAsyncOperator(
+ operator = BigQueryInsertJobOperator(
task_id="insert_query_job",
configuration=configuration,
location=TEST_DATASET_LOCATION,
job_id=job_id,
project_id=TEST_GCP_PROJECT_ID,
+ deferrable=True,
)
with pytest.raises(AirflowException):
@@ -1212,12 +1209,13 @@ def
test_bigquery_insert_job_operator_execute_complete():
}
job_id = "123456"
- operator = BigQueryInsertJobAsyncOperator(
+ operator = BigQueryInsertJobOperator(
task_id="insert_query_job",
configuration=configuration,
location=TEST_DATASET_LOCATION,
job_id=job_id,
project_id=TEST_GCP_PROJECT_ID,
+ deferrable=True,
)
with mock.patch.object(operator.log, "info") as mock_log_info:
operator.execute_complete(
@@ -1249,13 +1247,14 @@ def
test_bigquery_insert_job_operator_with_job_id_generate(mock_hook):
)
mock_hook.return_value.get_job.return_value = job
- op = BigQueryInsertJobAsyncOperator(
+ op = BigQueryInsertJobOperator(
task_id="insert_query_job",
configuration=configuration,
location=TEST_DATASET_LOCATION,
job_id=job_id,
project_id=TEST_GCP_PROJECT_ID,
reattach_states={"PENDING"},
+ deferrable=True,
)
with pytest.raises(TaskDeferred):
@@ -1294,13 +1293,14 @@ def test_execute_reattach(mock_hook):
)
mock_hook.return_value.get_job.return_value = job
- op = BigQueryInsertJobAsyncOperator(
+ op = BigQueryInsertJobOperator(
task_id="insert_query_job",
configuration=configuration,
location=TEST_DATASET_LOCATION,
job_id=job_id,
project_id=TEST_GCP_PROJECT_ID,
reattach_states={"PENDING"},
+ deferrable=True,
)
with pytest.raises(TaskDeferred):
@@ -1316,7 +1316,7 @@ def test_execute_reattach(mock_hook):
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
-def test_execute_force_rerun(mock_hook):
+def test_execute_force_rerun_async(mock_hook):
job_id = "123456"
hash_ = "hash"
real_job_id = f"{job_id}_{hash_}"
@@ -1338,13 +1338,14 @@ def test_execute_force_rerun(mock_hook):
)
mock_hook.return_value.get_job.return_value = job
- op = BigQueryInsertJobAsyncOperator(
+ op = BigQueryInsertJobOperator(
task_id="insert_query_job",
configuration=configuration,
location=TEST_DATASET_LOCATION,
job_id=job_id,
project_id=TEST_GCP_PROJECT_ID,
reattach_states={"PENDING"},
+ deferrable=True,
)
with pytest.raises(AirflowException) as exc:
@@ -1377,10 +1378,11 @@ def test_bigquery_check_operator_async(mock_hook):
mock_hook.return_value.insert_job.return_value =
MagicMock(job_id=real_job_id, error_result=False)
- op = BigQueryCheckAsyncOperator(
+ op = BigQueryCheckOperator(
task_id="bq_check_operator_job",
sql="SELECT * FROM any",
location=TEST_DATASET_LOCATION,
+ deferrable=True,
)
with pytest.raises(TaskDeferred) as exc:
@@ -1392,8 +1394,11 @@ def test_bigquery_check_operator_async(mock_hook):
def test_bigquery_check_operator_execute_failure():
"""Tests that an AirflowException is raised in case of error event"""
- operator = BigQueryCheckAsyncOperator(
- task_id="bq_check_operator_execute_failure", sql="SELECT * FROM any",
location=TEST_DATASET_LOCATION
+ operator = BigQueryCheckOperator(
+ task_id="bq_check_operator_execute_failure",
+ sql="SELECT * FROM any",
+ location=TEST_DATASET_LOCATION,
+ deferrable=True,
)
with pytest.raises(AirflowException):
@@ -1403,14 +1408,17 @@ def test_bigquery_check_operator_execute_failure():
def test_bigquery_check_op_execute_complete_with_no_records():
"""Asserts that exception is raised with correct expected exception
message"""
- operator = BigQueryCheckAsyncOperator(
- task_id="bq_check_operator_execute_complete", sql="SELECT * FROM any",
location=TEST_DATASET_LOCATION
+ operator = BigQueryCheckOperator(
+ task_id="bq_check_operator_execute_complete",
+ sql="SELECT * FROM any",
+ location=TEST_DATASET_LOCATION,
+ deferrable=True,
)
with pytest.raises(AirflowException) as exc:
operator.execute_complete(context=None, event={"status": "success",
"records": None})
- expected_exception_msg = "The query returned None"
+ expected_exception_msg = "The query returned empty results"
assert str(exc.value) == expected_exception_msg
@@ -1420,8 +1428,11 @@ def
test_bigquery_check_op_execute_complete_with_non_boolean_records():
test_sql = "SELECT * FROM any"
- operator = BigQueryCheckAsyncOperator(
- task_id="bq_check_operator_execute_complete", sql=test_sql,
location=TEST_DATASET_LOCATION
+ operator = BigQueryCheckOperator(
+ task_id="bq_check_operator_execute_complete",
+ sql=test_sql,
+ location=TEST_DATASET_LOCATION,
+ deferrable=True,
)
expected_exception_msg = f"Test
failed.\nQuery:\n{test_sql}\nResults:\n{[20, False]!s}"
@@ -1435,8 +1446,11 @@ def
test_bigquery_check_op_execute_complete_with_non_boolean_records():
def test_bigquery_check_operator_execute_complete():
"""Asserts that logging occurs as expected"""
- operator = BigQueryCheckAsyncOperator(
- task_id="bq_check_operator_execute_complete", sql="SELECT * FROM any",
location=TEST_DATASET_LOCATION
+ operator = BigQueryCheckOperator(
+ task_id="bq_check_operator_execute_complete",
+ sql="SELECT * FROM any",
+ location=TEST_DATASET_LOCATION,
+ deferrable=True,
)
with mock.patch.object(operator.log, "info") as mock_log_info:
@@ -1447,28 +1461,30 @@ def test_bigquery_check_operator_execute_complete():
def test_bigquery_interval_check_operator_execute_complete():
"""Asserts that logging occurs as expected"""
- operator = BigQueryIntervalCheckAsyncOperator(
+ operator = BigQueryIntervalCheckOperator(
task_id="bq_interval_check_operator_execute_complete",
table="test_table",
metrics_thresholds={"COUNT(*)": 1.5},
location=TEST_DATASET_LOCATION,
+ deferrable=True,
)
with mock.patch.object(operator.log, "info") as mock_log_info:
operator.execute_complete(context=None, event={"status": "success",
"message": "Job completed"})
mock_log_info.assert_called_with(
- "%s completed with response %s ",
"bq_interval_check_operator_execute_complete", "success"
+ "%s completed with response %s ",
"bq_interval_check_operator_execute_complete", "Job completed"
)
def test_bigquery_interval_check_operator_execute_failure():
"""Tests that an AirflowException is raised in case of error event"""
- operator = BigQueryIntervalCheckAsyncOperator(
+ operator = BigQueryIntervalCheckOperator(
task_id="bq_interval_check_operator_execute_complete",
table="test_table",
metrics_thresholds={"COUNT(*)": 1.5},
location=TEST_DATASET_LOCATION,
+ deferrable=True,
)
with pytest.raises(AirflowException):
@@ -1487,11 +1503,12 @@ def
test_bigquery_interval_check_operator_async(mock_hook):
mock_hook.return_value.insert_job.return_value =
MagicMock(job_id=real_job_id, error_result=False)
- op = BigQueryIntervalCheckAsyncOperator(
+ op = BigQueryIntervalCheckOperator(
task_id="bq_interval_check_operator_execute_complete",
table="test_table",
metrics_thresholds={"COUNT(*)": 1.5},
location=TEST_DATASET_LOCATION,
+ deferrable=True,
)
with pytest.raises(TaskDeferred) as exc:
@@ -1514,12 +1531,13 @@ def
test_bigquery_get_data_operator_async_with_selected_fields(mock_hook):
mock_hook.return_value.insert_job.return_value =
MagicMock(job_id=real_job_id, error_result=False)
- op = BigQueryGetDataAsyncOperator(
+ op = BigQueryGetDataOperator(
task_id="get_data_from_bq",
dataset_id=TEST_DATASET,
- table_id=TEST_TABLE,
+ table_id=TEST_TABLE_ID,
max_results=100,
selected_fields="value,name",
+ deferrable=True,
)
with pytest.raises(TaskDeferred) as exc:
@@ -1540,11 +1558,12 @@ def
test_bigquery_get_data_operator_async_without_selected_fields(mock_hook):
mock_hook.return_value.insert_job.return_value =
MagicMock(job_id=real_job_id, error_result=False)
- op = BigQueryGetDataAsyncOperator(
+ op = BigQueryGetDataOperator(
task_id="get_data_from_bq",
dataset_id=TEST_DATASET,
- table_id=TEST_TABLE,
+ table_id=TEST_TABLE_ID,
max_results=100,
+ deferrable=True,
)
with pytest.raises(TaskDeferred) as exc:
@@ -1556,11 +1575,12 @@ def
test_bigquery_get_data_operator_async_without_selected_fields(mock_hook):
def test_bigquery_get_data_operator_execute_failure():
"""Tests that an AirflowException is raised in case of error event"""
- operator = BigQueryGetDataAsyncOperator(
+ operator = BigQueryGetDataOperator(
task_id="get_data_from_bq",
dataset_id=TEST_DATASET,
table_id="any",
max_results=100,
+ deferrable=True,
)
with pytest.raises(AirflowException):
@@ -1570,11 +1590,12 @@ def test_bigquery_get_data_operator_execute_failure():
def test_bigquery_get_data_op_execute_complete_with_records():
"""Asserts that exception is raised with correct expected exception
message"""
- operator = BigQueryGetDataAsyncOperator(
+ operator = BigQueryGetDataOperator(
task_id="get_data_from_bq",
dataset_id=TEST_DATASET,
table_id="any",
max_results=100,
+ deferrable=True,
)
with mock.patch.object(operator.log, "info") as mock_log_info:
@@ -1583,15 +1604,15 @@ def
test_bigquery_get_data_op_execute_complete_with_records():
def _get_value_check_async_operator(use_legacy_sql: bool = False):
- """Helper function to initialise BigQueryValueCheckOperatorAsync
operator"""
query = "SELECT COUNT(*) FROM Any"
pass_val = 2
- return BigQueryValueCheckAsyncOperator(
+ return BigQueryValueCheckOperator(
task_id="check_value",
sql=query,
pass_value=pass_val,
use_legacy_sql=use_legacy_sql,
+ deferrable=True,
)
@@ -1642,7 +1663,7 @@ def
test_bigquery_value_check_operator_execute_complete_failure():
def test_bigquery_value_check_missing_param(kwargs, expected):
"""Assert the exception if require param not pass to
BigQueryValueCheckOperatorAsync operator"""
with pytest.raises(AirflowException) as missing_param:
- BigQueryValueCheckAsyncOperator(**kwargs)
+ BigQueryValueCheckOperator(deferrable=True, **kwargs)
assert missing_param.value.args[0] == expected
@@ -1653,5 +1674,5 @@ def test_bigquery_value_check_empty():
"missing keyword arguments 'pass_value', 'sql'",
)
with pytest.raises(AirflowException) as missing_param:
- BigQueryValueCheckAsyncOperator(kwargs={})
+ BigQueryValueCheckOperator(deferrable=True, kwargs={})
assert (missing_param.value.args[0] == expected) or
(missing_param.value.args[0] == expected1)
diff --git
a/tests/system/providers/google/cloud/bigquery/example_bigquery_queries_async.py
b/tests/system/providers/google/cloud/bigquery/example_bigquery_queries_async.py
index cfe41d0eb7..ca9d13871f 100644
---
a/tests/system/providers/google/cloud/bigquery/example_bigquery_queries_async.py
+++
b/tests/system/providers/google/cloud/bigquery/example_bigquery_queries_async.py
@@ -25,25 +25,25 @@ from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
-from airflow.operators.empty import EmptyOperator
from airflow.providers.google.cloud.operators.bigquery import (
- BigQueryCheckAsyncOperator,
+ BigQueryCheckOperator,
BigQueryCreateEmptyDatasetOperator,
BigQueryCreateEmptyTableOperator,
BigQueryDeleteDatasetOperator,
- BigQueryGetDataAsyncOperator,
- BigQueryInsertJobAsyncOperator,
- BigQueryIntervalCheckAsyncOperator,
- BigQueryValueCheckAsyncOperator,
+ BigQueryGetDataOperator,
+ BigQueryInsertJobOperator,
+ BigQueryIntervalCheckOperator,
+ BigQueryValueCheckOperator,
)
from airflow.utils.trigger_rule import TriggerRule
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
-PROJECT_ID = os.getenv("SYSTEM_TESTS_GCP_PROJECT")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+
DAG_ID = "bigquery_queries_async"
+
DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}"
LOCATION = "us"
-EXECUTION_TIMEOUT = 6
TABLE_1 = "table1"
TABLE_2 = "table2"
@@ -62,15 +62,44 @@ INSERT_ROWS_QUERY = (
f"(42, 'fishy fish', '{INSERT_DATE}');"
)
+
+CONFIGURATION = {
+ "query": {
+ "query": f"""DECLARE success BOOL;
+ DECLARE size_bytes INT64;
+ DECLARE row_count INT64;
+ DECLARE DELAY_TIME DATETIME;
+ DECLARE WAIT STRING;
+ SET success = FALSE;
+
+ SELECT row_count = (SELECT row_count FROM {DATASET}.__TABLES__ WHERE
table_id='NON_EXISTING_TABLE');
+ IF row_count > 0 THEN
+ SELECT 'Table Exists!' as message, retry_count as retries;
+ SET success = TRUE;
+ ELSE
+ SELECT 'Table does not exist' as message, row_count;
+ SET WAIT = 'TRUE';
+ SET DELAY_TIME = DATETIME_ADD(CURRENT_DATETIME,INTERVAL 1 MINUTE);
+ WHILE WAIT = 'TRUE' DO
+ IF (DELAY_TIME < CURRENT_DATETIME) THEN
+ SET WAIT = 'FALSE';
+ END IF;
+ END WHILE;
+ END IF;""",
+ "useLegacySql": False,
+ }
+}
+
+
default_args = {
- "execution_timeout": timedelta(hours=EXECUTION_TIMEOUT),
- "retries": int(os.getenv("DEFAULT_TASK_RETRIES", 2)),
- "retry_delay":
timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))),
+ "execution_timeout": timedelta(hours=6),
+ "retries": 2,
+ "retry_delay": timedelta(seconds=60),
}
with DAG(
- dag_id="example_async_bigquery_queries_async",
- schedule=None,
+ dag_id=DAG_ID,
+ schedule='@once',
start_date=datetime(2022, 1, 1),
catchup=False,
default_args=default_args,
@@ -91,14 +120,15 @@ with DAG(
location=LOCATION,
)
- create_dataset >> create_table_1
-
delete_dataset = BigQueryDeleteDatasetOperator(
- task_id="delete_dataset", dataset_id=DATASET, delete_contents=True,
trigger_rule=TriggerRule.ALL_DONE
+ task_id="delete_dataset",
+ dataset_id=DATASET,
+ delete_contents=True,
+ trigger_rule=TriggerRule.ALL_DONE,
)
# [START howto_operator_bigquery_insert_job_async]
- insert_query_job = BigQueryInsertJobAsyncOperator(
+ insert_query_job = BigQueryInsertJobOperator(
task_id="insert_query_job",
configuration={
"query": {
@@ -107,11 +137,12 @@ with DAG(
}
},
location=LOCATION,
+ deferrable=True,
)
# [END howto_operator_bigquery_insert_job_async]
# [START howto_operator_bigquery_select_job_async]
- select_query_job = BigQueryInsertJobAsyncOperator(
+ select_query_job = BigQueryInsertJobOperator(
task_id="select_query_job",
configuration={
"query": {
@@ -120,32 +151,35 @@ with DAG(
}
},
location=LOCATION,
+ deferrable=True,
)
# [END howto_operator_bigquery_select_job_async]
# [START howto_operator_bigquery_value_check_async]
- check_value = BigQueryValueCheckAsyncOperator(
+ check_value = BigQueryValueCheckOperator(
task_id="check_value",
sql=f"SELECT COUNT(*) FROM {DATASET}.{TABLE_1}",
pass_value=2,
use_legacy_sql=False,
location=LOCATION,
+ deferrable=True,
)
# [END howto_operator_bigquery_value_check_async]
# [START howto_operator_bigquery_interval_check_async]
- check_interval = BigQueryIntervalCheckAsyncOperator(
+ check_interval = BigQueryIntervalCheckOperator(
task_id="check_interval",
table=f"{DATASET}.{TABLE_1}",
days_back=1,
metrics_thresholds={"COUNT(*)": 1.5},
use_legacy_sql=False,
location=LOCATION,
+ deferrable=True,
)
# [END howto_operator_bigquery_interval_check_async]
# [START howto_operator_bigquery_multi_query_async]
- bigquery_execute_multi_query = BigQueryInsertJobAsyncOperator(
+ bigquery_execute_multi_query = BigQueryInsertJobOperator(
task_id="execute_multi_query",
configuration={
"query": {
@@ -157,17 +191,19 @@ with DAG(
}
},
location=LOCATION,
+ deferrable=True,
)
# [END howto_operator_bigquery_multi_query_async]
# [START howto_operator_bigquery_get_data_async]
- get_data = BigQueryGetDataAsyncOperator(
+ get_data = BigQueryGetDataOperator(
task_id="get_data",
dataset_id=DATASET,
table_id=TABLE_1,
max_results=10,
selected_fields="value,name",
location=LOCATION,
+ deferrable=True,
)
# [END howto_operator_bigquery_get_data_async]
@@ -178,16 +214,17 @@ with DAG(
)
# [START howto_operator_bigquery_check_async]
- check_count = BigQueryCheckAsyncOperator(
+ check_count = BigQueryCheckOperator(
task_id="check_count",
sql=f"SELECT COUNT(*) FROM {DATASET}.{TABLE_1}",
use_legacy_sql=False,
location=LOCATION,
+ deferrable=True,
)
# [END howto_operator_bigquery_check_async]
# [START howto_operator_bigquery_execute_query_save_async]
- execute_query_save = BigQueryInsertJobAsyncOperator(
+ execute_query_save = BigQueryInsertJobOperator(
task_id="execute_query_save",
configuration={
"query": {
@@ -201,48 +238,31 @@ with DAG(
}
},
location=LOCATION,
+ deferrable=True,
)
# [END howto_operator_bigquery_execute_query_save_async]
- execute_long_running_query = BigQueryInsertJobAsyncOperator(
+ execute_long_running_query = BigQueryInsertJobOperator(
task_id="execute_long_running_query",
- configuration={
- "query": {
- "query": f"""DECLARE success BOOL;
- DECLARE size_bytes INT64;
- DECLARE row_count INT64;
- DECLARE DELAY_TIME DATETIME;
- DECLARE WAIT STRING;
- SET success = FALSE;
-
- SELECT row_count = (SELECT row_count FROM {DATASET}.__TABLES__ WHERE
table_id='NON_EXISTING_TABLE');
- IF row_count > 0 THEN
- SELECT 'Table Exists!' as message, retry_count as retries;
- SET success = TRUE;
- ELSE
- SELECT 'Table does not exist' as message, row_count;
- SET WAIT = 'TRUE';
- SET DELAY_TIME = DATETIME_ADD(CURRENT_DATETIME,INTERVAL 1 MINUTE);
- WHILE WAIT = 'TRUE' DO
- IF (DELAY_TIME < CURRENT_DATETIME) THEN
- SET WAIT = 'FALSE';
- END IF;
- END WHILE;
- END IF;""",
- "useLegacySql": False,
- }
- },
+ configuration=CONFIGURATION,
location=LOCATION,
+ deferrable=True,
)
- end = EmptyOperator(task_id="end")
-
- create_table_1 >> insert_query_job >> select_query_job >> check_count
+ create_dataset >> create_table_1 >> insert_query_job
+ insert_query_job >> select_query_job >> check_count
insert_query_job >> get_data >> get_data_result
insert_query_job >> execute_query_save >> bigquery_execute_multi_query
insert_query_job >> execute_long_running_query >> check_value >>
check_interval
[check_count, check_interval, bigquery_execute_multi_query,
get_data_result] >> delete_dataset
- [check_count, check_interval, bigquery_execute_multi_query,
get_data_result, delete_dataset] >> end
+
+ # ### Everything below this line is not part of example ###
+ # ### Just for system tests purpose ###
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
from tests.system.utils import get_test_run # noqa: E402