This is an automated email from the ASF dual-hosted git repository.

shahar1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 57141585e86 Add BigQuery routine operators and existence sensor 
(#65499)
57141585e86 is described below

commit 57141585e86f8e9dc43b696fa5a2e8439fe3931e
Author: Ashir Alam <[email protected]>
AuthorDate: Tue May 12 09:39:45 2026 -0400

    Add BigQuery routine operators and existence sensor (#65499)
---
 providers/google/docs/operators/cloud/bigquery.rst |   8 +
 .../docs/operators/cloud/bigquery_routines.rst     | 164 +++++++
 providers/google/provider.yaml                     |   1 +
 .../providers/google/cloud/hooks/bigquery.py       | 252 +++++++++++
 .../providers/google/cloud/operators/bigquery.py   | 492 +++++++++++++++++++++
 .../providers/google/cloud/sensors/bigquery.py     |  63 +++
 .../airflow/providers/google/get_provider_info.py  |   5 +-
 .../cloud/bigquery/example_bigquery_routines.py    | 219 +++++++++
 .../tests/unit/google/cloud/hooks/test_bigquery.py | 172 +++++++
 .../unit/google/cloud/operators/test_bigquery.py   | 183 ++++++++
 .../unit/google/cloud/sensors/test_bigquery.py     |  39 ++
 11 files changed, 1597 insertions(+), 1 deletion(-)

diff --git a/providers/google/docs/operators/cloud/bigquery.rst 
b/providers/google/docs/operators/cloud/bigquery.rst
index 41c49dd5e03..82f43c319dc 100644
--- a/providers/google/docs/operators/cloud/bigquery.rst
+++ b/providers/google/docs/operators/cloud/bigquery.rst
@@ -289,6 +289,14 @@ You can also use this operator to delete a materialized 
view.
     :start-after: [START howto_operator_bigquery_delete_materialized_view]
     :end-before: [END howto_operator_bigquery_delete_materialized_view]
 
+Manage routines
+^^^^^^^^^^^^^^^
+
+Airflow exposes the BigQuery routines API (user-defined functions, stored
+procedures, and table-valued functions) through a small set of dedicated
+operators and a sensor. See :doc:`bigquery_routines` for the full guide with
+examples for each routine type.
+
 .. _howto/operator:BigQueryInsertJobOperator:
 
 Execute BigQuery jobs
diff --git a/providers/google/docs/operators/cloud/bigquery_routines.rst 
b/providers/google/docs/operators/cloud/bigquery_routines.rst
new file mode 100644
index 00000000000..1cd317f438f
--- /dev/null
+++ b/providers/google/docs/operators/cloud/bigquery_routines.rst
@@ -0,0 +1,164 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+Google Cloud BigQuery Routines Operators
+========================================
+
+`BigQuery routines <https://cloud.google.com/bigquery/docs/routines>`__ are
+dataset-scoped resources that encapsulate logic you can reuse from SQL:
+
+* **Scalar user-defined functions** (SQL or JavaScript)
+* **Stored procedures** (SQL or Apache Spark)
+* **Table-valued functions** (SQL)
+* **User-defined aggregate functions** (SQL)
+* **Remote functions** backed by Cloud Run / Cloud Functions
+
+Airflow exposes the BigQuery routines API so your DAG can own both the routine
+definitions and the pipeline that depends on them, instead of embedding
+``CREATE FUNCTION`` / ``CREATE PROCEDURE`` DDL in a query job.
+
+Prerequisite Tasks
+^^^^^^^^^^^^^^^^^^
+
+.. include:: /operators/_partials/prerequisite_tasks.rst
+
+.. _howto/operator:BigQueryCreateRoutineOperator:
+
+Create a routine
+^^^^^^^^^^^^^^^^
+
+Use 
:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryCreateRoutineOperator`
+to create any routine type. Routine fields mirror the BigQuery REST API's
+``Routine`` resource. Pass them individually as keyword arguments, or pass the
+complete resource via ``routine_resource``.
+
+The ``if_exists`` argument controls collision behavior:
+
+* ``"fail"`` (default) — raise when the routine already exists.
+* ``"skip"`` — leave the existing routine in place and return it.
+* ``"replace"`` — delete the existing routine, then create the new one.
+
+Scalar SQL UDF:
+
+.. exampleinclude:: 
/../../google/tests/system/google/cloud/bigquery/example_bigquery_routines.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_bigquery_create_scalar_routine]
+    :end-before: [END howto_operator_bigquery_create_scalar_routine]
+
+Stored procedure:
+
+.. exampleinclude:: 
/../../google/tests/system/google/cloud/bigquery/example_bigquery_routines.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_bigquery_create_procedure_routine]
+    :end-before: [END howto_operator_bigquery_create_procedure_routine]
+
+Table-valued function:
+
+.. exampleinclude:: 
/../../google/tests/system/google/cloud/bigquery/example_bigquery_routines.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_bigquery_create_tvf_routine]
+    :end-before: [END howto_operator_bigquery_create_tvf_routine]
+
+.. _howto/operator:BigQueryUpdateRoutineOperator:
+
+Update a routine
+^^^^^^^^^^^^^^^^
+
+Use 
:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryUpdateRoutineOperator`
+to patch selected fields of an existing routine. Only the fields listed in
+``fields`` are updated; any listed field that is unset in ``routine_resource``
+is cleared on the server.
+
+.. exampleinclude:: 
/../../google/tests/system/google/cloud/bigquery/example_bigquery_routines.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_bigquery_update_routine]
+    :end-before: [END howto_operator_bigquery_update_routine]
+
+.. _howto/operator:BigQueryGetRoutineOperator:
+
+Fetch a routine
+^^^^^^^^^^^^^^^
+
+Use 
:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryGetRoutineOperator`
+to read a routine's metadata. The operator pushes the serialized resource to
+XCom.
+
+.. exampleinclude:: 
/../../google/tests/system/google/cloud/bigquery/example_bigquery_routines.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_bigquery_get_routine]
+    :end-before: [END howto_operator_bigquery_get_routine]
+
+.. _howto/operator:BigQueryListRoutinesOperator:
+
+List routines
+^^^^^^^^^^^^^
+
+Use 
:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryListRoutinesOperator`
+to list all routines in a dataset. Only a subset of each routine's fields is
+returned; use ``BigQueryGetRoutineOperator`` for the complete resource.
+
+.. exampleinclude:: 
/../../google/tests/system/google/cloud/bigquery/example_bigquery_routines.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_bigquery_list_routines]
+    :end-before: [END howto_operator_bigquery_list_routines]
+
+.. _howto/operator:BigQueryDeleteRoutineOperator:
+
+Delete a routine
+^^^^^^^^^^^^^^^^
+
+Use 
:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryDeleteRoutineOperator`
+to remove a routine. Set ``ignore_if_missing=True`` to make the delete a no-op
+when the routine does not exist.
+
+.. exampleinclude:: 
/../../google/tests/system/google/cloud/bigquery/example_bigquery_routines.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_bigquery_delete_routine]
+    :end-before: [END howto_operator_bigquery_delete_routine]
+
+.. _howto/sensor:BigQueryRoutineExistenceSensor:
+
+Wait for a routine
+^^^^^^^^^^^^^^^^^^
+
+Use 
:class:`~airflow.providers.google.cloud.sensors.bigquery.BigQueryRoutineExistenceSensor`
+to block downstream tasks until a routine exists. This is useful when routine
+creation happens in a separate DAG or an external system.
+
+.. exampleinclude:: 
/../../google/tests/system/google/cloud/bigquery/example_bigquery_routines.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_sensor_bigquery_routine_existence]
+    :end-before: [END howto_sensor_bigquery_routine_existence]
+
+Reference
+^^^^^^^^^
+
+For further information, look at:
+
+* `Google Cloud API Documentation 
<https://cloud.google.com/bigquery/docs/reference/rest/v2/routines>`__
+* `User-defined functions 
<https://cloud.google.com/bigquery/docs/user-defined-functions>`__
+* `Stored procedures <https://cloud.google.com/bigquery/docs/procedures>`__
+* `Table functions <https://cloud.google.com/bigquery/docs/table-functions>`__
diff --git a/providers/google/provider.yaml b/providers/google/provider.yaml
index aff958b5429..4eed2dd95e2 100644
--- a/providers/google/provider.yaml
+++ b/providers/google/provider.yaml
@@ -148,6 +148,7 @@ integrations:
   - integration-name: Google BigQuery
     how-to-guide:
       - /docs/apache-airflow-providers-google/operators/cloud/bigquery.rst
+      - 
/docs/apache-airflow-providers-google/operators/cloud/bigquery_routines.rst
     external-doc-url: https://cloud.google.com/bigquery/
     logo: /docs/integration-logos/BigQuery.png
     tags: [gcp]
diff --git 
a/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py 
b/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
index 8af8aecdb6a..1aa72d81935 100644
--- a/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
@@ -47,6 +47,7 @@ from google.cloud.bigquery import (
 )
 from google.cloud.bigquery.dataset import AccessEntry, Dataset, 
DatasetListItem, DatasetReference
 from google.cloud.bigquery.retry import DEFAULT_JOB_RETRY
+from google.cloud.bigquery.routine import Routine, RoutineReference
 from google.cloud.bigquery.table import (
     Row,
     RowIterator,
@@ -96,6 +97,21 @@ log = logging.getLogger(__name__)
 
 BigQueryJob = CopyJob | QueryJob | LoadJob | ExtractJob
 
+_ROUTINE_WRITABLE_PROPERTIES: tuple[str, ...] = (
+    "type_",
+    "language",
+    "arguments",
+    "return_type",
+    "return_table_type",
+    "imported_libraries",
+    "body",
+    "description",
+    "determinism_level",
+    "remote_function_options",
+    "data_governance_type",
+    "external_runtime_options",
+)
+
 
 class BigQueryHook(GoogleBaseHook, DbApiHook):
     """
@@ -1170,6 +1186,242 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
         )
         return table
 
+    @GoogleBaseHook.fallback_to_default_project_id
+    def create_routine(
+        self,
+        routine: Routine | dict[str, Any],
+        dataset_id: str | None = None,
+        routine_id: str | None = None,
+        project_id: str = PROVIDE_PROJECT_ID,
+        if_exists: str = "fail",
+        retry: Retry = DEFAULT_RETRY,
+        timeout: float | None = None,
+    ) -> Routine:
+        """
+        Create a new routine (UDF, procedure, or TVF) in the dataset.
+
+        :param routine: The routine to create. Either a
+            :class:`~google.cloud.bigquery.routine.Routine` instance or a dict 
in the format defined
+            at 
https://cloud.google.com/bigquery/docs/reference/rest/v2/routines#Routine. If 
the
+            routine reference is incomplete, ``dataset_id`` and ``routine_id`` 
are used to complete
+            it.
+        :param dataset_id: Optional. The dataset that will own the routine. 
Required if ``routine``
+            does not include a fully-qualified ``routineReference``.
+        :param routine_id: Optional. The routine identifier. Required if 
``routine`` does not
+            include a fully-qualified ``routineReference``.
+        :param project_id: Optional. The project that owns the dataset. Falls 
back to the hook
+            default.
+        :param if_exists: What to do if a routine with the same identifier 
already exists:
+            ``"fail"`` (default) raises 
:class:`google.api_core.exceptions.Conflict`;
+            ``"skip"`` leaves the existing routine untouched and returns it;
+            ``"replace"`` deletes the existing routine and creates the new one.
+        :param retry: Optional. A retry object used to retry requests.
+        :param timeout: Optional. The amount of time, in seconds, to wait for 
the request.
+        :return: The created (or existing) routine.
+        """
+        if if_exists not in {"fail", "skip", "replace"}:
+            raise ValueError(f"`if_exists` must be one of 'fail', 'skip', 
'replace'; got {if_exists!r}")
+        routine = self._build_routine(
+            routine, project_id=project_id, dataset_id=dataset_id, 
routine_id=routine_id
+        )
+        client = self.get_client(project_id=project_id)
+        ref = routine.reference
+        routine_path = f"{ref.project}.{ref.dataset_id}.{ref.routine_id}"
+
+        if if_exists == "replace":
+            try:
+                client.delete_routine(ref, retry=retry, timeout=timeout)
+                self.log.info("Deleted existing routine before replace: %s", 
routine_path)
+            except NotFound:
+                pass
+            result = client.create_routine(routine, exists_ok=False, 
retry=retry, timeout=timeout)
+        else:
+            result = client.create_routine(
+                routine, exists_ok=(if_exists == "skip"), retry=retry, 
timeout=timeout
+            )
+
+        self.log.info("Created routine: %s", routine_path)
+        return result
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def update_routine(
+        self,
+        routine: Routine | dict[str, Any],
+        fields: Sequence[str],
+        dataset_id: str | None = None,
+        routine_id: str | None = None,
+        project_id: str = PROVIDE_PROJECT_ID,
+        retry: Retry = DEFAULT_RETRY,
+        timeout: float | None = None,
+    ) -> Routine:
+        """
+        Update specified fields of an existing routine.
+
+        BigQuery's ``routines.update`` endpoint is a full-resource PUT (not a 
PATCH), so
+        this method fetches the existing routine, merges in the requested 
field changes,
+        and sends the complete resource back. A field listed in ``fields`` but 
absent in
+        ``routine`` is cleared on the server.
+
+        :param routine: The routine providing new values for the listed 
fields, either a
+            :class:`~google.cloud.bigquery.routine.Routine` or a Routine API 
dict (keys in
+            camelCase, e.g. ``{"description": ..., "definitionBody": ...}``).
+        :param fields: The routine properties to update, given as Routine API 
field names
+            (e.g. ``["description", "definitionBody"]``).
+        :param dataset_id: Optional. Used to complete the routine reference if 
missing.
+        :param routine_id: Optional. Used to complete the routine reference if 
missing.
+        :param project_id: Optional. The project that owns the dataset.
+        :param retry: Optional. A retry object used to retry requests.
+        :param timeout: Optional. The amount of time, in seconds, to wait for 
the request.
+        """
+        if not fields:
+            raise ValueError("`fields` must be a non-empty sequence of routine 
properties to update.")
+
+        if isinstance(routine, Routine):
+            updates_repr = routine.to_api_repr()
+        else:
+            updates_repr = dict(routine)
+
+        ref_repr = dict(updates_repr.get("routineReference") or {})
+        ref_repr.setdefault("projectId", project_id)
+        if dataset_id:
+            ref_repr.setdefault("datasetId", dataset_id)
+        if routine_id:
+            ref_repr.setdefault("routineId", routine_id)
+
+        client = self.get_client(project_id=project_id)
+        existing = 
client.get_routine(RoutineReference.from_api_repr(ref_repr), retry=retry, 
timeout=timeout)
+        merged_repr = existing.to_api_repr()
+        for field in fields:
+            if field in updates_repr:
+                merged_repr[field] = updates_repr[field]
+            else:
+                merged_repr.pop(field, None)
+
+        merged = Routine.from_api_repr(merged_repr)
+        result = client.update_routine(
+            merged, list(_ROUTINE_WRITABLE_PROPERTIES), retry=retry, 
timeout=timeout
+        )
+        out_ref = result.reference
+        self.log.info("Updated routine: %s.%s.%s", out_ref.project, 
out_ref.dataset_id, out_ref.routine_id)
+        return result
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def delete_routine(
+        self,
+        dataset_id: str,
+        routine_id: str,
+        project_id: str = PROVIDE_PROJECT_ID,
+        not_found_ok: bool = True,
+        retry: Retry = DEFAULT_RETRY,
+        timeout: float | None = None,
+    ) -> None:
+        """
+        Delete an existing routine.
+
+        :param dataset_id: The dataset that owns the routine.
+        :param routine_id: The identifier of the routine to delete.
+        :param project_id: Optional. The project that owns the dataset.
+        :param not_found_ok: If ``True`` (default), ignore "not found" errors.
+        :param retry: Optional. A retry object used to retry requests.
+        :param timeout: Optional. The amount of time, in seconds, to wait for 
the request.
+        """
+        ref = RoutineReference.from_api_repr(
+            {"projectId": project_id, "datasetId": dataset_id, "routineId": 
routine_id}
+        )
+        routine_path = f"{project_id}.{dataset_id}.{routine_id}"
+        try:
+            self.get_client(project_id=project_id).delete_routine(ref, 
retry=retry, timeout=timeout)
+        except NotFound:
+            if not not_found_ok:
+                raise
+            self.log.info("Routine not found, ignoring: %s", routine_path)
+            return
+        self.log.info("Deleted routine: %s", routine_path)
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def get_routine(
+        self,
+        dataset_id: str,
+        routine_id: str,
+        project_id: str = PROVIDE_PROJECT_ID,
+        retry: Retry = DEFAULT_RETRY,
+        timeout: float | None = None,
+    ) -> Routine:
+        """
+        Retrieve the metadata for a routine.
+
+        :param dataset_id: The dataset that owns the routine.
+        :param routine_id: The identifier of the routine to fetch.
+        :param project_id: Optional. The project that owns the dataset.
+        :param retry: Optional. A retry object used to retry requests.
+        :param timeout: Optional. The amount of time, in seconds, to wait for 
the request.
+        """
+        ref = RoutineReference.from_api_repr(
+            {"projectId": project_id, "datasetId": dataset_id, "routineId": 
routine_id}
+        )
+        return self.get_client(project_id=project_id).get_routine(ref, 
retry=retry, timeout=timeout)
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def list_routines(
+        self,
+        dataset_id: str,
+        project_id: str = PROVIDE_PROJECT_ID,
+        max_results: int | None = None,
+        page_token: str | None = None,
+        retry: Retry = DEFAULT_RETRY,
+        timeout: float | None = None,
+    ) -> list[Routine]:
+        """
+        List routines in a dataset.
+
+        :param dataset_id: The dataset to list routines for.
+        :param project_id: Optional. The project that owns the dataset.
+        :param max_results: Optional. The maximum number of routines to return.
+        :param page_token: Optional. An opaque token identifying the page of 
results to return.
+        :param retry: Optional. A retry object used to retry requests.
+        :param timeout: Optional. The amount of time, in seconds, to wait for 
the request.
+        :return: The list of routines. Only a subset of fields is populated; 
fetch individual
+            routines via :meth:`get_routine` for the complete resource.
+        """
+        dataset_ref = DatasetReference(project=project_id, 
dataset_id=dataset_id)
+        iterator = self.get_client(project_id=project_id).list_routines(
+            dataset_ref,
+            max_results=max_results,
+            page_token=page_token,
+            retry=retry,
+            timeout=timeout,
+        )
+        return list(iterator)
+
+    @staticmethod
+    def _build_routine(
+        routine: Routine | dict[str, Any],
+        project_id: str,
+        dataset_id: str | None,
+        routine_id: str | None,
+    ) -> Routine:
+        """Return a :class:`Routine` with a fully-qualified reference, filling 
gaps from kwargs."""
+        if isinstance(routine, Routine):
+            resource = routine.to_api_repr()
+        else:
+            resource = dict(routine)
+
+        ref = resource.setdefault("routineReference", {})
+        ref.setdefault("projectId", project_id)
+        if dataset_id:
+            ref.setdefault("datasetId", dataset_id)
+        if routine_id:
+            ref.setdefault("routineId", routine_id)
+
+        missing = [k for k in ("projectId", "datasetId", "routineId") if not 
ref.get(k)]
+        if missing:
+            raise ValueError(
+                "Routine reference is missing required fields "
+                f"{missing!r}. Provide them via `dataset_id`/`routine_id` or 
in the routine "
+                "reference itself."
+            )
+        return Routine.from_api_repr(resource)
+
     @GoogleBaseHook.fallback_to_default_project_id
     def poll_job_complete(
         self,
diff --git 
a/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py 
b/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py
index 280ae79422b..120350d5b39 100644
--- a/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py
+++ b/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py
@@ -30,6 +30,7 @@ from typing import TYPE_CHECKING, Any, SupportsAbs
 from google.api_core.exceptions import Conflict
 from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
 from google.cloud.bigquery import DEFAULT_RETRY, CopyJob, ExtractJob, LoadJob, 
QueryJob, Row
+from google.cloud.bigquery.routine import Routine
 from google.cloud.bigquery.table import RowIterator, Table, TableListItem, 
TableReference
 
 from airflow.exceptions import AirflowProviderDeprecationWarning
@@ -2570,3 +2571,494 @@ class 
BigQueryInsertJobOperator(GoogleCloudBaseOperator, _BigQueryInsertJobOpera
             )
         else:
             self.log.info("Skipping to cancel job: %s:%s.%s", self.project_id, 
self.location, self.job_id)
+
+
+class BigQueryCreateRoutineOperator(GoogleCloudBaseOperator):
+    """
+    Create a BigQuery routine (UDF, stored procedure, table-valued function, 
or aggregate).
+
+    The routine is defined by a ``Routine`` resource as documented at
+    https://cloud.google.com/bigquery/docs/reference/rest/v2/routines#Routine. 
The full resource
+    may be passed via ``routine_resource``, or individual fields can be passed 
as keyword
+    arguments, which are merged into ``routine_resource`` at execute time.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:BigQueryCreateRoutineOperator`
+
+    :param dataset_id: The dataset that will own the routine.
+    :param routine_id: The identifier of the routine to create.
+    :param routine_resource: The routine resource as a dict or
+        :class:`~google.cloud.bigquery.routine.Routine`. If omitted, the 
resource is assembled
+        from the individual keyword arguments below.
+    :param project_id: Optional. The project that owns the dataset. Falls back 
to the connection's
+        default.
+    :param routine_type: Optional. One of ``"SCALAR_FUNCTION"``, 
``"PROCEDURE"``,
+        ``"TABLE_VALUED_FUNCTION"``, ``"AGGREGATE_FUNCTION"``.
+    :param language: Optional. ``"SQL"`` or ``"JAVASCRIPT"``.
+    :param definition_body: Optional. The body of the routine (SQL or 
JavaScript).
+    :param arguments: Optional. Sequence of argument dicts in the Google API 
representation
+        (see the ``Routine.Argument`` schema).
+    :param return_type: Optional. The return type of the routine as a 
``StandardSqlDataType``
+        dict.
+    :param return_table_type: Optional. The return table type for table-valued 
functions as a
+        ``StandardSqlTableType`` dict.
+    :param imported_libraries: Optional. URIs of Cloud Storage libraries to 
import (JavaScript
+        UDFs only).
+    :param determinism_level: Optional. Determinism level for the routine.
+    :param security_mode: Optional. Security mode (``"DEFINER"`` or 
``"INVOKER"``).
+    :param data_governance_type: Optional. Data governance type for the 
routine.
+    :param description: Optional. Description of the routine.
+    :param remote_function_options: Optional. Options for remote functions.
+    :param spark_options: Optional. Options for Spark stored procedures.
+    :param if_exists: What to do if a routine with the same identifier already 
exists.
+        ``"fail"`` (default) raises, ``"skip"`` returns the existing routine, 
``"replace"``
+        deletes the existing routine and creates the new one.
+    :param gcp_conn_id: The connection ID used to connect to Google Cloud.
+    :param location: The location of the BigQuery dataset.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts.
+    :param retry: A retry object used to retry requests.
+    :param timeout: The amount of time, in seconds, to wait for the request.
+    """
+
+    template_fields: Sequence[str] = (
+        "project_id",
+        "dataset_id",
+        "routine_id",
+        "definition_body",
+        "arguments",
+        "routine_resource",
+        "gcp_conn_id",
+        "impersonation_chain",
+    )
+    template_fields_renderers = {"routine_resource": "json", "arguments": 
"json"}
+    template_ext: Sequence[str] = (".sql",)
+    ui_color = BigQueryUIColors.TABLE.value
+
+    def __init__(
+        self,
+        *,
+        dataset_id: str,
+        routine_id: str,
+        routine_resource: dict[str, Any] | Routine | None = None,
+        project_id: str = PROVIDE_PROJECT_ID,
+        routine_type: str | None = None,
+        language: str | None = None,
+        definition_body: str | None = None,
+        arguments: Sequence[dict[str, Any]] | None = None,
+        return_type: dict[str, Any] | None = None,
+        return_table_type: dict[str, Any] | None = None,
+        imported_libraries: Sequence[str] | None = None,
+        determinism_level: str | None = None,
+        security_mode: str | None = None,
+        data_governance_type: str | None = None,
+        description: str | None = None,
+        remote_function_options: dict[str, Any] | None = None,
+        spark_options: dict[str, Any] | None = None,
+        if_exists: str = "fail",
+        gcp_conn_id: str = "google_cloud_default",
+        location: str | None = None,
+        impersonation_chain: str | Sequence[str] | None = None,
+        retry: Retry = DEFAULT_RETRY,
+        timeout: float | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        if if_exists not in {"fail", "skip", "replace"}:
+            raise ValueError(f"`if_exists` must be one of 'fail', 'skip', 
'replace'; got {if_exists!r}")
+        self.project_id = project_id
+        self.dataset_id = dataset_id
+        self.routine_id = routine_id
+        self.routine_resource = routine_resource
+        self.routine_type = routine_type
+        self.language = language
+        self.definition_body = definition_body
+        self.arguments = arguments
+        self.return_type = return_type
+        self.return_table_type = return_table_type
+        self.imported_libraries = imported_libraries
+        self.determinism_level = determinism_level
+        self.security_mode = security_mode
+        self.data_governance_type = data_governance_type
+        self.description = description
+        self.remote_function_options = remote_function_options
+        self.spark_options = spark_options
+        self.if_exists = if_exists
+        self.gcp_conn_id = gcp_conn_id
+        self.location = location
+        self.impersonation_chain = impersonation_chain
+        self.retry = retry
+        self.timeout = timeout
+
+    def _build_resource(self) -> dict[str, Any]:
+        if isinstance(self.routine_resource, Routine):
+            resource = self.routine_resource.to_api_repr()
+        elif self.routine_resource is not None:
+            resource = dict(self.routine_resource)
+        else:
+            resource = {}
+
+        field_map = {
+            "routineType": self.routine_type,
+            "language": self.language,
+            "definitionBody": self.definition_body,
+            "arguments": list(self.arguments) if self.arguments is not None 
else None,
+            "returnType": self.return_type,
+            "returnTableType": self.return_table_type,
+            "importedLibraries": (
+                list(self.imported_libraries) if self.imported_libraries is 
not None else None
+            ),
+            "determinismLevel": self.determinism_level,
+            "securityMode": self.security_mode,
+            "dataGovernanceType": self.data_governance_type,
+            "description": self.description,
+            "remoteFunctionOptions": self.remote_function_options,
+            "sparkOptions": self.spark_options,
+        }
+        for key, value in field_map.items():
+            if value is not None:
+                resource.setdefault(key, value)
+        return resource
+
+    def execute(self, context: Context) -> dict[str, Any]:
+        hook = BigQueryHook(
+            gcp_conn_id=self.gcp_conn_id,
+            location=self.location,
+            impersonation_chain=self.impersonation_chain,
+        )
+        resource = self._build_resource()
+        self.log.info(
+            "Creating routine %s.%s.%s (if_exists=%s)",
+            self.project_id or hook.project_id,
+            self.dataset_id,
+            self.routine_id,
+            self.if_exists,
+        )
+        routine = hook.create_routine(
+            routine=resource,
+            dataset_id=self.dataset_id,
+            routine_id=self.routine_id,
+            project_id=self.project_id,
+            if_exists=self.if_exists,
+            retry=self.retry,
+            timeout=self.timeout,
+        )
+        return routine.to_api_repr()
+
+
+class BigQueryUpdateRoutineOperator(GoogleCloudBaseOperator):
+    """
+    Patch selected fields of an existing BigQuery routine.
+
+    Only the fields listed in ``fields`` are updated. Any field listed but 
left unset in
+    ``routine_resource`` is cleared on the server.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:BigQueryUpdateRoutineOperator`
+
+    :param dataset_id: The dataset that owns the routine.
+    :param routine_id: The identifier of the routine to update.
+    :param routine_resource: The routine resource (dict or
+        :class:`~google.cloud.bigquery.routine.Routine`) with the new values 
for the fields
+        listed in ``fields``.
+    :param fields: Properties to update (e.g. ``["definitionBody", 
"description"]``).
+    :param project_id: Optional. The project that owns the dataset.
+    :param gcp_conn_id: The connection ID used to connect to Google Cloud.
+    :param location: The location of the BigQuery dataset.
+    :param impersonation_chain: Optional service account to impersonate.
+    :param retry: A retry object used to retry requests.
+    :param timeout: The amount of time, in seconds, to wait for the request.
+    """
+
+    template_fields: Sequence[str] = (
+        "project_id",
+        "dataset_id",
+        "routine_id",
+        "routine_resource",
+        "gcp_conn_id",
+        "impersonation_chain",
+    )
+    template_fields_renderers = {"routine_resource": "json"}
+    template_ext: Sequence[str] = (".json", ".sql")
+    ui_color = BigQueryUIColors.TABLE.value
+
+    def __init__(
+        self,
+        *,
+        dataset_id: str,
+        routine_id: str,
+        routine_resource: dict[str, Any] | Routine,
+        fields: Sequence[str],
+        project_id: str = PROVIDE_PROJECT_ID,
+        gcp_conn_id: str = "google_cloud_default",
+        location: str | None = None,
+        impersonation_chain: str | Sequence[str] | None = None,
+        retry: Retry = DEFAULT_RETRY,
+        timeout: float | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        if not fields:
+            raise ValueError("`fields` must be a non-empty sequence of routine 
properties to update.")
+        self.project_id = project_id
+        self.dataset_id = dataset_id
+        self.routine_id = routine_id
+        self.routine_resource = routine_resource
+        self.fields = list(fields)
+        self.gcp_conn_id = gcp_conn_id
+        self.location = location
+        self.impersonation_chain = impersonation_chain
+        self.retry = retry
+        self.timeout = timeout
+
+    def execute(self, context: Context) -> dict[str, Any]:
+        hook = BigQueryHook(
+            gcp_conn_id=self.gcp_conn_id,
+            location=self.location,
+            impersonation_chain=self.impersonation_chain,
+        )
+        self.log.info(
+            "Updating routine %s.%s.%s (fields=%s)",
+            self.project_id or hook.project_id,
+            self.dataset_id,
+            self.routine_id,
+            self.fields,
+        )
+        routine = hook.update_routine(
+            routine=self.routine_resource,
+            fields=self.fields,
+            dataset_id=self.dataset_id,
+            routine_id=self.routine_id,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+        )
+        return routine.to_api_repr()
+
+
+class BigQueryDeleteRoutineOperator(GoogleCloudBaseOperator):
+    """
+    Delete a BigQuery routine.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:BigQueryDeleteRoutineOperator`
+
+    :param dataset_id: The dataset that owns the routine.
+    :param routine_id: The identifier of the routine to delete.
+    :param project_id: Optional. The project that owns the dataset.
+    :param ignore_if_missing: If ``True``, do not fail when the routine does 
not exist.
+        Defaults to ``False``.
+    :param gcp_conn_id: The connection ID used to connect to Google Cloud.
+    :param location: The location of the BigQuery dataset.
+    :param impersonation_chain: Optional service account to impersonate.
+    :param retry: A retry object used to retry requests.
+    :param timeout: The amount of time, in seconds, to wait for the request.
+    """
+
+    template_fields: Sequence[str] = (
+        "project_id",
+        "dataset_id",
+        "routine_id",
+        "gcp_conn_id",
+        "impersonation_chain",
+    )
+    ui_color = BigQueryUIColors.TABLE.value
+
+    def __init__(
+        self,
+        *,
+        dataset_id: str,
+        routine_id: str,
+        project_id: str = PROVIDE_PROJECT_ID,
+        ignore_if_missing: bool = False,
+        gcp_conn_id: str = "google_cloud_default",
+        location: str | None = None,
+        impersonation_chain: str | Sequence[str] | None = None,
+        retry: Retry = DEFAULT_RETRY,
+        timeout: float | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.project_id = project_id
+        self.dataset_id = dataset_id
+        self.routine_id = routine_id
+        self.ignore_if_missing = ignore_if_missing
+        self.gcp_conn_id = gcp_conn_id
+        self.location = location
+        self.impersonation_chain = impersonation_chain
+        self.retry = retry
+        self.timeout = timeout
+
+    def execute(self, context: Context) -> None:
+        hook = BigQueryHook(
+            gcp_conn_id=self.gcp_conn_id,
+            location=self.location,
+            impersonation_chain=self.impersonation_chain,
+        )
+        self.log.info(
+            "Deleting routine %s.%s.%s",
+            self.project_id or hook.project_id,
+            self.dataset_id,
+            self.routine_id,
+        )
+        hook.delete_routine(
+            dataset_id=self.dataset_id,
+            routine_id=self.routine_id,
+            project_id=self.project_id,
+            not_found_ok=self.ignore_if_missing,
+            retry=self.retry,
+            timeout=self.timeout,
+        )
+
+
+class BigQueryGetRoutineOperator(GoogleCloudBaseOperator):
+    """
+    Fetch an existing BigQuery routine and return its serialized API 
representation via XCom.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:BigQueryGetRoutineOperator`
+
+    :param dataset_id: The dataset that owns the routine.
+    :param routine_id: The identifier of the routine to fetch.
+    :param project_id: Optional. The project that owns the dataset.
+    :param gcp_conn_id: The connection ID used to connect to Google Cloud.
+    :param location: The location of the BigQuery dataset.
+    :param impersonation_chain: Optional service account to impersonate.
+    :param retry: A retry object used to retry requests.
+    :param timeout: The amount of time, in seconds, to wait for the request.
+    """
+
+    template_fields: Sequence[str] = (
+        "project_id",
+        "dataset_id",
+        "routine_id",
+        "gcp_conn_id",
+        "impersonation_chain",
+    )
+    ui_color = BigQueryUIColors.TABLE.value
+
+    def __init__(
+        self,
+        *,
+        dataset_id: str,
+        routine_id: str,
+        project_id: str = PROVIDE_PROJECT_ID,
+        gcp_conn_id: str = "google_cloud_default",
+        location: str | None = None,
+        impersonation_chain: str | Sequence[str] | None = None,
+        retry: Retry = DEFAULT_RETRY,
+        timeout: float | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.project_id = project_id
+        self.dataset_id = dataset_id
+        self.routine_id = routine_id
+        self.gcp_conn_id = gcp_conn_id
+        self.location = location
+        self.impersonation_chain = impersonation_chain
+        self.retry = retry
+        self.timeout = timeout
+
+    def execute(self, context: Context) -> dict[str, Any]:
+        hook = BigQueryHook(
+            gcp_conn_id=self.gcp_conn_id,
+            location=self.location,
+            impersonation_chain=self.impersonation_chain,
+        )
+        self.log.info(
+            "Fetching routine %s.%s.%s",
+            self.project_id or hook.project_id,
+            self.dataset_id,
+            self.routine_id,
+        )
+        routine = hook.get_routine(
+            dataset_id=self.dataset_id,
+            routine_id=self.routine_id,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+        )
+        return routine.to_api_repr()
+
+
+class BigQueryListRoutinesOperator(GoogleCloudBaseOperator):
+    """
+    List routines in a BigQuery dataset and return them as a list via XCom.
+
+    The returned items are API representations. Only a subset of routine 
fields is populated
+    on list responses; fetch individual routines with 
:class:`BigQueryGetRoutineOperator` for
+    the complete resource.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:BigQueryListRoutinesOperator`
+
+    :param dataset_id: The dataset to list routines for.
+    :param project_id: Optional. The project that owns the dataset.
+    :param max_results: Optional. Maximum number of routines to return.
+    :param page_token: Optional. Token identifying a page of results to return.
+    :param gcp_conn_id: The connection ID used to connect to Google Cloud.
+    :param location: The location of the BigQuery dataset.
+    :param impersonation_chain: Optional service account to impersonate.
+    :param retry: A retry object used to retry requests.
+    :param timeout: The amount of time, in seconds, to wait for the request.
+    """
+
+    template_fields: Sequence[str] = (
+        "project_id",
+        "dataset_id",
+        "gcp_conn_id",
+        "impersonation_chain",
+    )
+    ui_color = BigQueryUIColors.TABLE.value
+
+    def __init__(
+        self,
+        *,
+        dataset_id: str,
+        project_id: str = PROVIDE_PROJECT_ID,
+        max_results: int | None = None,
+        page_token: str | None = None,
+        gcp_conn_id: str = "google_cloud_default",
+        location: str | None = None,
+        impersonation_chain: str | Sequence[str] | None = None,
+        retry: Retry = DEFAULT_RETRY,
+        timeout: float | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.project_id = project_id
+        self.dataset_id = dataset_id
+        self.max_results = max_results
+        self.page_token = page_token
+        self.gcp_conn_id = gcp_conn_id
+        self.location = location
+        self.impersonation_chain = impersonation_chain
+        self.retry = retry
+        self.timeout = timeout
+
+    def execute(self, context: Context) -> list[dict[str, Any]]:
+        hook = BigQueryHook(
+            gcp_conn_id=self.gcp_conn_id,
+            location=self.location,
+            impersonation_chain=self.impersonation_chain,
+        )
+        self.log.info(
+            "Listing routines in %s.%s",
+            self.project_id or hook.project_id,
+            self.dataset_id,
+        )
+        routines = hook.list_routines(
+            dataset_id=self.dataset_id,
+            project_id=self.project_id,
+            max_results=self.max_results,
+            page_token=self.page_token,
+            retry=self.retry,
+            timeout=self.timeout,
+        )
+        return [routine.to_api_repr() for routine in routines]
diff --git 
a/providers/google/src/airflow/providers/google/cloud/sensors/bigquery.py 
b/providers/google/src/airflow/providers/google/cloud/sensors/bigquery.py
index 5b891bb075c..71e37bff784 100644
--- a/providers/google/src/airflow/providers/google/cloud/sensors/bigquery.py
+++ b/providers/google/src/airflow/providers/google/cloud/sensors/bigquery.py
@@ -24,6 +24,8 @@ from collections.abc import Sequence
 from datetime import timedelta
 from typing import TYPE_CHECKING, Any
 
+from google.api_core.exceptions import NotFound
+
 from airflow.exceptions import AirflowProviderDeprecationWarning
 from airflow.providers.common.compat.sdk import AirflowException, 
BaseSensorOperator, conf
 from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
@@ -148,6 +150,67 @@ class BigQueryTableExistenceSensor(BaseSensorOperator):
         raise AirflowException(message)
 
 
+class BigQueryRoutineExistenceSensor(BaseSensorOperator):
+    """
+    Checks for the existence of a routine (UDF, procedure, or TVF) in a 
BigQuery dataset.
+
+    :param project_id: The Google Cloud project that owns the dataset.
+    :param dataset_id: The dataset that owns the routine.
+    :param routine_id: The identifier of the routine to check.
+    :param gcp_conn_id: (Optional) The connection ID used to connect to Google 
Cloud.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields: Sequence[str] = (
+        "project_id",
+        "dataset_id",
+        "routine_id",
+        "impersonation_chain",
+    )
+    ui_color = "#f0eee4"
+
+    def __init__(
+        self,
+        *,
+        project_id: str,
+        dataset_id: str,
+        routine_id: str,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.project_id = project_id
+        self.dataset_id = dataset_id
+        self.routine_id = routine_id
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def poke(self, context: Context) -> bool:
+        routine_uri = f"{self.project_id}.{self.dataset_id}.{self.routine_id}"
+        self.log.info("Sensor checks existence of routine: %s", routine_uri)
+        hook = BigQueryHook(
+            gcp_conn_id=self.gcp_conn_id,
+            impersonation_chain=self.impersonation_chain,
+        )
+        try:
+            hook.get_routine(
+                project_id=self.project_id,
+                dataset_id=self.dataset_id,
+                routine_id=self.routine_id,
+            )
+        except NotFound:
+            return False
+        return True
+
+
 class BigQueryTablePartitionExistenceSensor(BaseSensorOperator):
     """
     Checks for the existence of a partition within a table in Google Bigquery.
diff --git a/providers/google/src/airflow/providers/google/get_provider_info.py 
b/providers/google/src/airflow/providers/google/get_provider_info.py
index d0122a94c51..c172eee6e87 100644
--- a/providers/google/src/airflow/providers/google/get_provider_info.py
+++ b/providers/google/src/airflow/providers/google/get_provider_info.py
@@ -52,7 +52,10 @@ def get_provider_info():
             },
             {
                 "integration-name": "Google BigQuery",
-                "how-to-guide": 
["/docs/apache-airflow-providers-google/operators/cloud/bigquery.rst"],
+                "how-to-guide": [
+                    
"/docs/apache-airflow-providers-google/operators/cloud/bigquery.rst",
+                    
"/docs/apache-airflow-providers-google/operators/cloud/bigquery_routines.rst",
+                ],
                 "external-doc-url": "https://cloud.google.com/bigquery/";,
                 "logo": "/docs/integration-logos/BigQuery.png",
                 "tags": ["gcp"],
diff --git 
a/providers/google/tests/system/google/cloud/bigquery/example_bigquery_routines.py
 
b/providers/google/tests/system/google/cloud/bigquery/example_bigquery_routines.py
new file mode 100644
index 00000000000..03d238ef1b0
--- /dev/null
+++ 
b/providers/google/tests/system/google/cloud/bigquery/example_bigquery_routines.py
@@ -0,0 +1,219 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example Airflow DAG for Google BigQuery service testing routine operations.
+
+Exercises the full BigQuery routines lifecycle through Airflow: create a 
scalar SQL
+UDF, a stored procedure, and a table-valued function; verify their existence; 
list
+and fetch them; update one; and delete them all.
+"""
+
+from __future__ import annotations
+
+import os
+from datetime import datetime
+
+from airflow.models.dag import DAG
+from airflow.providers.google.cloud.operators.bigquery import (
+    BigQueryCreateEmptyDatasetOperator,
+    BigQueryCreateRoutineOperator,
+    BigQueryDeleteDatasetOperator,
+    BigQueryDeleteRoutineOperator,
+    BigQueryGetRoutineOperator,
+    BigQueryListRoutinesOperator,
+    BigQueryUpdateRoutineOperator,
+)
+from airflow.providers.google.cloud.sensors.bigquery import 
BigQueryRoutineExistenceSensor
+
+try:
+    from airflow.sdk import TriggerRule
+except ImportError:
+    # Compatibility for Airflow < 3.1
+    from airflow.utils.trigger_rule import TriggerRule  # type: 
ignore[no-redef,attr-defined]
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or "default"
+DAG_ID = "bigquery_routines"
+
+DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}"
+SCALAR_ROUTINE = f"scalar_udf_{ENV_ID}"
+PROCEDURE_ROUTINE = f"procedure_{ENV_ID}"
+TVF_ROUTINE = f"tvf_{ENV_ID}"
+
+INT64_TYPE = {"typeKind": "INT64"}
+STRING_TYPE = {"typeKind": "STRING"}
+
+
+with DAG(
+    DAG_ID,
+    schedule="@once",
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=["example", "bigquery"],
+) as dag:
+    create_dataset = BigQueryCreateEmptyDatasetOperator(
+        task_id="create_dataset", dataset_id=DATASET_NAME, 
project_id=PROJECT_ID
+    )
+
+    # [START howto_operator_bigquery_create_scalar_routine]
+    create_scalar_routine = BigQueryCreateRoutineOperator(
+        task_id="create_scalar_routine",
+        project_id=PROJECT_ID,
+        dataset_id=DATASET_NAME,
+        routine_id=SCALAR_ROUTINE,
+        routine_type="SCALAR_FUNCTION",
+        language="SQL",
+        arguments=[{"name": "x", "dataType": INT64_TYPE}],
+        return_type=INT64_TYPE,
+        definition_body="x + 1",
+        description="Adds one to its argument.",
+    )
+    # [END howto_operator_bigquery_create_scalar_routine]
+
+    # [START howto_operator_bigquery_create_procedure_routine]
+    create_procedure = BigQueryCreateRoutineOperator(
+        task_id="create_procedure",
+        project_id=PROJECT_ID,
+        dataset_id=DATASET_NAME,
+        routine_id=PROCEDURE_ROUTINE,
+        routine_type="PROCEDURE",
+        language="SQL",
+        arguments=[
+            {"name": "prefix", "dataType": STRING_TYPE, "argumentKind": 
"FIXED_TYPE"},
+        ],
+        definition_body="BEGIN SELECT CONCAT(prefix, ' world') AS greeting; 
END",
+        description="Echoes a prefixed greeting.",
+    )
+    # [END howto_operator_bigquery_create_procedure_routine]
+
+    # [START howto_operator_bigquery_create_tvf_routine]
+    create_tvf = BigQueryCreateRoutineOperator(
+        task_id="create_tvf",
+        project_id=PROJECT_ID,
+        dataset_id=DATASET_NAME,
+        routine_id=TVF_ROUTINE,
+        routine_type="TABLE_VALUED_FUNCTION",
+        language="SQL",
+        arguments=[{"name": "n", "dataType": INT64_TYPE}],
+        return_table_type={
+            "columns": [
+                {"name": "value", "type": INT64_TYPE},
+            ]
+        },
+        definition_body="SELECT * FROM UNNEST(GENERATE_ARRAY(1, n)) AS value",
+        description="Generates integers 1..n as a table.",
+    )
+    # [END howto_operator_bigquery_create_tvf_routine]
+
+    # [START howto_sensor_bigquery_routine_existence]
+    wait_for_routine = BigQueryRoutineExistenceSensor(
+        task_id="wait_for_routine",
+        project_id=PROJECT_ID,
+        dataset_id=DATASET_NAME,
+        routine_id=SCALAR_ROUTINE,
+        timeout=60,
+        poke_interval=5,
+    )
+    # [END howto_sensor_bigquery_routine_existence]
+
+    # [START howto_operator_bigquery_update_routine]
+    update_routine = BigQueryUpdateRoutineOperator(
+        task_id="update_routine",
+        project_id=PROJECT_ID,
+        dataset_id=DATASET_NAME,
+        routine_id=SCALAR_ROUTINE,
+        routine_resource={"description": "Updated description for scalar UDF"},
+        fields=["description"],
+    )
+    # [END howto_operator_bigquery_update_routine]
+
+    # [START howto_operator_bigquery_get_routine]
+    get_routine = BigQueryGetRoutineOperator(
+        task_id="get_routine",
+        project_id=PROJECT_ID,
+        dataset_id=DATASET_NAME,
+        routine_id=SCALAR_ROUTINE,
+    )
+    # [END howto_operator_bigquery_get_routine]
+
+    # [START howto_operator_bigquery_list_routines]
+    list_routines = BigQueryListRoutinesOperator(
+        task_id="list_routines",
+        project_id=PROJECT_ID,
+        dataset_id=DATASET_NAME,
+    )
+    # [END howto_operator_bigquery_list_routines]
+
+    # [START howto_operator_bigquery_delete_routine]
+    delete_scalar_routine = BigQueryDeleteRoutineOperator(
+        task_id="delete_scalar_routine",
+        project_id=PROJECT_ID,
+        dataset_id=DATASET_NAME,
+        routine_id=SCALAR_ROUTINE,
+        ignore_if_missing=True,
+    )
+    # [END howto_operator_bigquery_delete_routine]
+
+    delete_procedure = BigQueryDeleteRoutineOperator(
+        task_id="delete_procedure",
+        project_id=PROJECT_ID,
+        dataset_id=DATASET_NAME,
+        routine_id=PROCEDURE_ROUTINE,
+        ignore_if_missing=True,
+    )
+    delete_tvf = BigQueryDeleteRoutineOperator(
+        task_id="delete_tvf",
+        project_id=PROJECT_ID,
+        dataset_id=DATASET_NAME,
+        routine_id=TVF_ROUTINE,
+        ignore_if_missing=True,
+    )
+
+    delete_dataset = BigQueryDeleteDatasetOperator(
+        task_id="delete_dataset",
+        project_id=PROJECT_ID,
+        dataset_id=DATASET_NAME,
+        delete_contents=True,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    (
+        # TEST SETUP
+        create_dataset
+        # TEST BODY
+        >> [create_scalar_routine, create_procedure, create_tvf]
+        >> wait_for_routine
+        >> update_routine
+        >> get_routine
+        >> list_routines
+        >> [delete_scalar_routine, delete_procedure, delete_tvf]
+        # TEST TEARDOWN
+        >> delete_dataset
+    )
+
+    from tests_common.test_utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+
+from tests_common.test_utils.system_tests import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: 
contributing-docs/testing/system_tests.rst)
+test_run = get_test_run(dag)
diff --git a/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py 
b/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
index 054eefb1a11..ddda199849c 100644
--- a/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
+++ b/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
@@ -36,6 +36,7 @@ from google.cloud.bigquery import (
     TableReference,
 )
 from google.cloud.bigquery.dataset import AccessEntry, Dataset, DatasetListItem
+from google.cloud.bigquery.routine import Routine
 from google.cloud.bigquery.table import _EmptyRowIterator
 from google.cloud.exceptions import NotFound
 
@@ -1012,6 +1013,177 @@ class TestTableOperations(_BigQueryBaseTestClass):
         )
 
 
[email protected]_test
+class TestRoutineOperations(_BigQueryBaseTestClass):
+    ROUTINE_ID = "bq_routine"
+    ROUTINE_REF_REPR = {
+        "projectId": PROJECT_ID,
+        "datasetId": DATASET_ID,
+        "routineId": ROUTINE_ID,
+    }
+    ROUTINE_RESOURCE = {
+        "routineReference": ROUTINE_REF_REPR,
+        "routineType": "SCALAR_FUNCTION",
+        "language": "SQL",
+        "definitionBody": "x + 1",
+        "arguments": [{"name": "x", "dataType": {"typeKind": "INT64"}}],
+        "returnType": {"typeKind": "INT64"},
+    }
+
+    
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_client")
+    def test_create_routine_fail_mode(self, mock_client):
+        self.hook.create_routine(
+            routine=dict(self.ROUTINE_RESOURCE),
+            project_id=PROJECT_ID,
+        )
+        mock_client.assert_called_once_with(project_id=PROJECT_ID)
+        call = mock_client.return_value.create_routine.call_args
+        assert call.kwargs["exists_ok"] is False
+        mock_client.return_value.delete_routine.assert_not_called()
+
+    
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_client")
+    def test_create_routine_skip_mode(self, mock_client):
+        self.hook.create_routine(
+            routine=dict(self.ROUTINE_RESOURCE),
+            project_id=PROJECT_ID,
+            if_exists="skip",
+        )
+        assert 
mock_client.return_value.create_routine.call_args.kwargs["exists_ok"] is True
+        mock_client.return_value.delete_routine.assert_not_called()
+
+    
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_client")
+    def test_create_routine_replace_mode_deletes_first(self, mock_client):
+        self.hook.create_routine(
+            routine=dict(self.ROUTINE_RESOURCE),
+            project_id=PROJECT_ID,
+            if_exists="replace",
+        )
+        mock_client.return_value.delete_routine.assert_called_once()
+        create_call = mock_client.return_value.create_routine.call_args
+        assert create_call.kwargs["exists_ok"] is False
+
+    
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_client")
+    def test_create_routine_replace_ignores_not_found_on_delete(self, 
mock_client):
+        mock_client.return_value.delete_routine.side_effect = NotFound("nope")
+        self.hook.create_routine(
+            routine=dict(self.ROUTINE_RESOURCE),
+            project_id=PROJECT_ID,
+            if_exists="replace",
+        )
+        mock_client.return_value.create_routine.assert_called_once()
+
+    def test_create_routine_invalid_if_exists(self):
+        with pytest.raises(ValueError, match="must be one of"):
+            self.hook.create_routine(
+                routine=dict(self.ROUTINE_RESOURCE),
+                project_id=PROJECT_ID,
+                if_exists="bogus",
+            )
+
+    
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_client")
+    def test_create_routine_fills_missing_reference(self, mock_client):
+        body = {
+            "routineType": "SCALAR_FUNCTION",
+            "language": "SQL",
+            "definitionBody": "1",
+        }
+        self.hook.create_routine(
+            routine=body,
+            dataset_id=DATASET_ID,
+            routine_id=self.ROUTINE_ID,
+            project_id=PROJECT_ID,
+        )
+        create_call = mock_client.return_value.create_routine.call_args
+        created_routine = (
+            create_call.kwargs["routine"] if "routine" in create_call.kwargs 
else create_call.args[0]
+        )
+        ref = created_routine.reference
+        assert ref.project == PROJECT_ID
+        assert ref.dataset_id == DATASET_ID
+        assert ref.routine_id == self.ROUTINE_ID
+
+    def test_create_routine_missing_reference_raises(self):
+        with pytest.raises(ValueError, match="missing required fields"):
+            self.hook.create_routine(
+                routine={"routineType": "SCALAR_FUNCTION", "language": "SQL", 
"definitionBody": "1"},
+                project_id=PROJECT_ID,
+            )
+
+    
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_client")
+    def test_update_routine(self, mock_client):
+        existing_repr = dict(self.ROUTINE_RESOURCE)
+        existing_repr["description"] = "original"
+        mock_client.return_value.get_routine.return_value = 
Routine.from_api_repr(existing_repr)
+
+        self.hook.update_routine(
+            routine={"description": "new", "definitionBody": "x + 2"},
+            fields=["description", "definitionBody"],
+            dataset_id=DATASET_ID,
+            routine_id=self.ROUTINE_ID,
+            project_id=PROJECT_ID,
+        )
+
+        mock_client.return_value.get_routine.assert_called_once()
+        update_call = mock_client.return_value.update_routine.call_args
+        sent_routine = update_call.args[0] if update_call.args else 
update_call.kwargs["routine"]
+        sent_fields = update_call.args[1] if len(update_call.args) > 1 else 
update_call.kwargs["fields"]
+        # Merged resource carries the updated values plus the untouched 
original metadata.
+        assert sent_routine.description == "new"
+        assert sent_routine.body == "x + 2"
+        assert sent_routine.type_ == "SCALAR_FUNCTION"
+        # Full-resource PUT: all writable properties are included in the 
outbound fields list.
+        assert "description" in sent_fields
+        assert "body" in sent_fields
+        assert "type_" in sent_fields
+
+    
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_client")
+    def test_delete_routine(self, mock_client):
+        self.hook.delete_routine(
+            dataset_id=DATASET_ID,
+            routine_id=self.ROUTINE_ID,
+            project_id=PROJECT_ID,
+        )
+        mock_client.return_value.delete_routine.assert_called_once()
+
+    
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_client")
+    def test_delete_routine_not_found_ok(self, mock_client):
+        mock_client.return_value.delete_routine.side_effect = NotFound("nope")
+        # default not_found_ok=True — should not raise
+        self.hook.delete_routine(
+            dataset_id=DATASET_ID,
+            routine_id=self.ROUTINE_ID,
+            project_id=PROJECT_ID,
+        )
+
+    
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_client")
+    def test_delete_routine_raises_when_not_found_ok_false(self, mock_client):
+        mock_client.return_value.delete_routine.side_effect = NotFound("nope")
+        with pytest.raises(NotFound):
+            self.hook.delete_routine(
+                dataset_id=DATASET_ID,
+                routine_id=self.ROUTINE_ID,
+                project_id=PROJECT_ID,
+                not_found_ok=False,
+            )
+
+    
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_client")
+    def test_get_routine(self, mock_client):
+        self.hook.get_routine(
+            dataset_id=DATASET_ID,
+            routine_id=self.ROUTINE_ID,
+            project_id=PROJECT_ID,
+        )
+        mock_client.return_value.get_routine.assert_called_once()
+
+    
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_client")
+    def test_list_routines(self, mock_client):
+        mock_client.return_value.list_routines.return_value = 
iter([mock.MagicMock(), mock.MagicMock()])
+        result = self.hook.list_routines(dataset_id=DATASET_ID, 
project_id=PROJECT_ID)
+        assert len(result) == 2
+        list_call = mock_client.return_value.list_routines.call_args
+        assert list_call.kwargs["max_results"] is None
+
+
 @pytest.mark.db_test
 class TestBigQueryCursor(_BigQueryBaseTestClass):
     @mock.patch("airflow.providers.google.cloud.hooks.bigquery.build")
diff --git 
a/providers/google/tests/unit/google/cloud/operators/test_bigquery.py 
b/providers/google/tests/unit/google/cloud/operators/test_bigquery.py
index 70427e794e6..f857c6713d4 100644
--- a/providers/google/tests/unit/google/cloud/operators/test_bigquery.py
+++ b/providers/google/tests/unit/google/cloud/operators/test_bigquery.py
@@ -27,6 +27,7 @@ from unittest.mock import ANY, MagicMock
 import pandas as pd
 import pytest
 from google.cloud.bigquery import DEFAULT_RETRY, ScalarQueryParameter, Table
+from google.cloud.bigquery.routine import Routine
 from google.cloud.exceptions import Conflict
 
 from airflow.providers.common.compat.openlineage.facet import (
@@ -52,16 +53,21 @@ from airflow.providers.google.cloud.operators.bigquery 
import (
     BigQueryCheckOperator,
     BigQueryColumnCheckOperator,
     BigQueryCreateEmptyDatasetOperator,
+    BigQueryCreateRoutineOperator,
     BigQueryCreateTableOperator,
     BigQueryDeleteDatasetOperator,
+    BigQueryDeleteRoutineOperator,
     BigQueryDeleteTableOperator,
     BigQueryGetDataOperator,
     BigQueryGetDatasetOperator,
     BigQueryGetDatasetTablesOperator,
+    BigQueryGetRoutineOperator,
     BigQueryInsertJobOperator,
     BigQueryIntervalCheckOperator,
+    BigQueryListRoutinesOperator,
     BigQueryTableCheckOperator,
     BigQueryUpdateDatasetOperator,
+    BigQueryUpdateRoutineOperator,
     BigQueryUpdateTableOperator,
     BigQueryUpdateTableSchemaOperator,
     BigQueryUpsertTableOperator,
@@ -2877,3 +2883,180 @@ class TestBigQueryTableCheckOperator:
             job_id="",
             nowait=False,
         )
+
+
+TEST_ROUTINE_ID = "test-routine-id"
+TEST_ROUTINE_REF = {
+    "projectId": TEST_GCP_PROJECT_ID,
+    "datasetId": TEST_DATASET,
+    "routineId": TEST_ROUTINE_ID,
+}
+TEST_ROUTINE_RESOURCE = {
+    "routineReference": TEST_ROUTINE_REF,
+    "routineType": "SCALAR_FUNCTION",
+    "language": "SQL",
+    "definitionBody": "x + 1",
+    "arguments": [{"name": "x", "dataType": {"typeKind": "INT64"}}],
+    "returnType": {"typeKind": "INT64"},
+}
+
+
+class TestBigQueryCreateRoutineOperator:
+    
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+    def test_execute_with_resource(self, mock_hook):
+        
mock_hook.return_value.create_routine.return_value.to_api_repr.return_value = 
TEST_ROUTINE_RESOURCE
+        operator = BigQueryCreateRoutineOperator(
+            task_id=TASK_ID,
+            dataset_id=TEST_DATASET,
+            routine_id=TEST_ROUTINE_ID,
+            project_id=TEST_GCP_PROJECT_ID,
+            routine_resource=dict(TEST_ROUTINE_RESOURCE),
+        )
+        result = operator.execute(context=MagicMock())
+
+        assert result == TEST_ROUTINE_RESOURCE
+        mock_hook.return_value.create_routine.assert_called_once()
+        call_kwargs = mock_hook.return_value.create_routine.call_args.kwargs
+        assert call_kwargs["dataset_id"] == TEST_DATASET
+        assert call_kwargs["routine_id"] == TEST_ROUTINE_ID
+        assert call_kwargs["project_id"] == TEST_GCP_PROJECT_ID
+        assert call_kwargs["if_exists"] == "fail"
+
+    
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+    def test_execute_builds_resource_from_fields(self, mock_hook):
+        
mock_hook.return_value.create_routine.return_value.to_api_repr.return_value = {}
+        operator = BigQueryCreateRoutineOperator(
+            task_id=TASK_ID,
+            dataset_id=TEST_DATASET,
+            routine_id=TEST_ROUTINE_ID,
+            project_id=TEST_GCP_PROJECT_ID,
+            routine_type="SCALAR_FUNCTION",
+            language="SQL",
+            definition_body="x + 1",
+            arguments=[{"name": "x", "dataType": {"typeKind": "INT64"}}],
+            return_type={"typeKind": "INT64"},
+            if_exists="replace",
+        )
+        operator.execute(context=MagicMock())
+
+        call_kwargs = mock_hook.return_value.create_routine.call_args.kwargs
+        routine_resource = call_kwargs["routine"]
+        assert routine_resource["routineType"] == "SCALAR_FUNCTION"
+        assert routine_resource["language"] == "SQL"
+        assert routine_resource["definitionBody"] == "x + 1"
+        assert routine_resource["returnType"] == {"typeKind": "INT64"}
+        assert call_kwargs["if_exists"] == "replace"
+
+    def test_invalid_if_exists(self):
+        with pytest.raises(ValueError, match="must be one of"):
+            BigQueryCreateRoutineOperator(
+                task_id=TASK_ID,
+                dataset_id=TEST_DATASET,
+                routine_id=TEST_ROUTINE_ID,
+                if_exists="bogus",
+            )
+
+
+class TestBigQueryUpdateRoutineOperator:
+    
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+    def test_execute(self, mock_hook):
+        
mock_hook.return_value.update_routine.return_value.to_api_repr.return_value = 
TEST_ROUTINE_RESOURCE
+        operator = BigQueryUpdateRoutineOperator(
+            task_id=TASK_ID,
+            dataset_id=TEST_DATASET,
+            routine_id=TEST_ROUTINE_ID,
+            project_id=TEST_GCP_PROJECT_ID,
+            routine_resource=dict(TEST_ROUTINE_RESOURCE),
+            fields=["description", "definitionBody"],
+        )
+        result = operator.execute(context=MagicMock())
+
+        assert result == TEST_ROUTINE_RESOURCE
+        call_kwargs = mock_hook.return_value.update_routine.call_args.kwargs
+        assert call_kwargs["fields"] == ["description", "definitionBody"]
+        assert call_kwargs["dataset_id"] == TEST_DATASET
+        assert call_kwargs["routine_id"] == TEST_ROUTINE_ID
+
+    def test_empty_fields_raises(self):
+        with pytest.raises(ValueError, match="non-empty sequence"):
+            BigQueryUpdateRoutineOperator(
+                task_id=TASK_ID,
+                dataset_id=TEST_DATASET,
+                routine_id=TEST_ROUTINE_ID,
+                routine_resource=dict(TEST_ROUTINE_RESOURCE),
+                fields=[],
+            )
+
+
+class TestBigQueryDeleteRoutineOperator:
+    
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+    def test_execute(self, mock_hook):
+        operator = BigQueryDeleteRoutineOperator(
+            task_id=TASK_ID,
+            dataset_id=TEST_DATASET,
+            routine_id=TEST_ROUTINE_ID,
+            project_id=TEST_GCP_PROJECT_ID,
+        )
+        operator.execute(context=MagicMock())
+
+        call_kwargs = mock_hook.return_value.delete_routine.call_args.kwargs
+        assert call_kwargs["dataset_id"] == TEST_DATASET
+        assert call_kwargs["routine_id"] == TEST_ROUTINE_ID
+        assert call_kwargs["project_id"] == TEST_GCP_PROJECT_ID
+        assert call_kwargs["not_found_ok"] is False
+
+    
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+    def test_execute_ignore_if_missing(self, mock_hook):
+        operator = BigQueryDeleteRoutineOperator(
+            task_id=TASK_ID,
+            dataset_id=TEST_DATASET,
+            routine_id=TEST_ROUTINE_ID,
+            project_id=TEST_GCP_PROJECT_ID,
+            ignore_if_missing=True,
+        )
+        operator.execute(context=MagicMock())
+        call_kwargs = mock_hook.return_value.delete_routine.call_args.kwargs
+        assert call_kwargs["not_found_ok"] is True
+
+
+class TestBigQueryGetRoutineOperator:
+    
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+    def test_execute(self, mock_hook):
+        
mock_hook.return_value.get_routine.return_value.to_api_repr.return_value = 
TEST_ROUTINE_RESOURCE
+        operator = BigQueryGetRoutineOperator(
+            task_id=TASK_ID,
+            dataset_id=TEST_DATASET,
+            routine_id=TEST_ROUTINE_ID,
+            project_id=TEST_GCP_PROJECT_ID,
+        )
+        result = operator.execute(context=MagicMock())
+
+        assert result == TEST_ROUTINE_RESOURCE
+        call_kwargs = mock_hook.return_value.get_routine.call_args.kwargs
+        assert call_kwargs["dataset_id"] == TEST_DATASET
+        assert call_kwargs["routine_id"] == TEST_ROUTINE_ID
+
+
+class TestBigQueryListRoutinesOperator:
+    
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+    def test_execute(self, mock_hook):
+        first, second = MagicMock(spec=Routine), MagicMock(spec=Routine)
+        first.to_api_repr.return_value = {"routineReference": {"routineId": 
"r1"}}
+        second.to_api_repr.return_value = {"routineReference": {"routineId": 
"r2"}}
+        mock_hook.return_value.list_routines.return_value = [first, second]
+
+        operator = BigQueryListRoutinesOperator(
+            task_id=TASK_ID,
+            dataset_id=TEST_DATASET,
+            project_id=TEST_GCP_PROJECT_ID,
+            max_results=10,
+        )
+        result = operator.execute(context=MagicMock())
+
+        assert result == [
+            {"routineReference": {"routineId": "r1"}},
+            {"routineReference": {"routineId": "r2"}},
+        ]
+        call_kwargs = mock_hook.return_value.list_routines.call_args.kwargs
+        assert call_kwargs["dataset_id"] == TEST_DATASET
+        assert call_kwargs["max_results"] == 10
diff --git a/providers/google/tests/unit/google/cloud/sensors/test_bigquery.py 
b/providers/google/tests/unit/google/cloud/sensors/test_bigquery.py
index 00776d89013..174f3b217bd 100644
--- a/providers/google/tests/unit/google/cloud/sensors/test_bigquery.py
+++ b/providers/google/tests/unit/google/cloud/sensors/test_bigquery.py
@@ -19,9 +19,11 @@ from __future__ import annotations
 from unittest import mock
 
 import pytest
+from google.api_core.exceptions import NotFound
 
 from airflow.providers.common.compat.sdk import AirflowException, TaskDeferred
 from airflow.providers.google.cloud.sensors.bigquery import (
+    BigQueryRoutineExistenceSensor,
     BigQueryTableExistenceSensor,
     BigQueryTablePartitionExistenceSensor,
 )
@@ -245,6 +247,43 @@ class TestBigqueryTablePartitionExistenceSensor:
         )
 
 
+class TestBigQueryRoutineExistenceSensor:
+    ROUTINE_ID = "test_routine_id"
+
+    @mock.patch("airflow.providers.google.cloud.sensors.bigquery.BigQueryHook")
+    def test_poke_returns_true_when_routine_exists(self, mock_hook):
+        task = BigQueryRoutineExistenceSensor(
+            task_id="task-id",
+            project_id=TEST_PROJECT_ID,
+            dataset_id=TEST_DATASET_ID,
+            routine_id=self.ROUTINE_ID,
+            gcp_conn_id=TEST_GCP_CONN_ID,
+            impersonation_chain=TEST_IMPERSONATION_CHAIN,
+        )
+        assert task.poke(mock.MagicMock()) is True
+
+        mock_hook.assert_called_once_with(
+            gcp_conn_id=TEST_GCP_CONN_ID,
+            impersonation_chain=TEST_IMPERSONATION_CHAIN,
+        )
+        mock_hook.return_value.get_routine.assert_called_once_with(
+            project_id=TEST_PROJECT_ID,
+            dataset_id=TEST_DATASET_ID,
+            routine_id=self.ROUTINE_ID,
+        )
+
+    @mock.patch("airflow.providers.google.cloud.sensors.bigquery.BigQueryHook")
+    def test_poke_returns_false_when_routine_missing(self, mock_hook):
+        mock_hook.return_value.get_routine.side_effect = NotFound("not found")
+        task = BigQueryRoutineExistenceSensor(
+            task_id="task-id",
+            project_id=TEST_PROJECT_ID,
+            dataset_id=TEST_DATASET_ID,
+            routine_id=self.ROUTINE_ID,
+        )
+        assert task.poke(mock.MagicMock()) is False
+
+
 @pytest.fixture
 def context():
     """

Reply via email to