This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new cf1e26b046 Add BigQueryToPostgresOperator (#30658)
cf1e26b046 is described below

commit cf1e26b04669ad2b232618fee623e6bc7797a13a
Author: Jean-Baptiste Braun <[email protected]>
AuthorDate: Tue May 16 13:20:57 2023 +0200

    Add BigQueryToPostgresOperator (#30658)
    
    Co-authored-by: eladkal <[email protected]>
---
 .../google/cloud/transfers/bigquery_to_mssql.py    | 117 +++++++-------------
 .../google/cloud/transfers/bigquery_to_mysql.py    | 118 +++++----------------
 .../google/cloud/transfers/bigquery_to_postgres.py |  56 ++++++++++
 .../{bigquery_to_mysql.py => bigquery_to_sql.py}   |  53 ++++-----
 airflow/providers/google/provider.yaml             |   7 ++
 .../operators/transfer/bigquery_to_mysql.rst       |   2 +-
 ...query_to_mysql.rst => bigquery_to_postgres.rst} |  26 ++---
 tests/always/test_project_structure.py             |   2 +-
 .../cloud/transfers/test_bigquery_to_mssql.py      |   6 +-
 .../cloud/transfers/test_bigquery_to_mysql.py      |   6 +-
 ...ry_to_mysql.py => test_bigquery_to_postgres.py} |  14 ++-
 .../cloud/bigquery/example_bigquery_to_mssql.py    |   4 +-
 ...ry_to_mssql.py => example_bigquery_to_mysql.py} |  21 ++--
 ...to_mssql.py => example_bigquery_to_postgres.py} |  23 ++--
 14 files changed, 200 insertions(+), 255 deletions(-)

diff --git a/airflow/providers/google/cloud/transfers/bigquery_to_mssql.py 
b/airflow/providers/google/cloud/transfers/bigquery_to_mssql.py
index 095eb874c7..0c7e58a458 100644
--- a/airflow/providers/google/cloud/transfers/bigquery_to_mssql.py
+++ b/airflow/providers/google/cloud/transfers/bigquery_to_mssql.py
@@ -18,19 +18,19 @@
 """This module contains Google BigQuery to MSSQL operator."""
 from __future__ import annotations
 
+import warnings
 from typing import TYPE_CHECKING, Sequence
 
-from airflow.models import BaseOperator
-from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
+from airflow.exceptions import AirflowProviderDeprecationWarning
 from airflow.providers.google.cloud.links.bigquery import BigQueryTableLink
-from airflow.providers.google.cloud.utils.bigquery_get_data import 
bigquery_get_data
+from airflow.providers.google.cloud.transfers.bigquery_to_sql import 
BigQueryToSqlBaseOperator
 from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
 
 
-class BigQueryToMsSqlOperator(BaseOperator):
+class BigQueryToMsSqlOperator(BigQueryToSqlBaseOperator):
     """
     Fetches the data from a BigQuery table (alternatively fetch data for 
selected columns)
     and insert that data into a MSSQL table.
@@ -39,86 +39,62 @@ class BigQueryToMsSqlOperator(BaseOperator):
         For more information on how to use this operator, take a look at the 
guide:
         :ref:`howto/operator:BigQueryToMsSqlOperator`
 
-    .. note::
-        If you pass fields to ``selected_fields`` which are in different order 
than the
-        order of columns already in
-        BQ table, the data will still be in the order of BQ table.
-        For example if the BQ table has 3 columns as
-        ``[A,B,C]`` and you pass 'B,A' in the ``selected_fields``
-        the data would still be of the form ``'A,B'`` and passed through this 
form
-        to MSSQL
-
-    **Example**: ::
-
-       transfer_data = BigQueryToMsSqlOperator(
-            task_id='task_id',
-            source_project_dataset_table='my-project.mydataset.mytable',
-            mssql_table='dest_table_name',
-            replace=True,
-        )
-
     :param source_project_dataset_table: A dotted 
``<project>.<dataset>.<table>``:
         the big query table of origin
-    :param selected_fields: List of fields to return (comma-separated). If
-        unspecified, all fields are returned.
-    :param gcp_conn_id: reference to a specific Google Cloud hook.
+    :param mssql_table: target MsSQL table. It is deprecated: use 
target_table_name instead. (templated)
+    :param target_table_name: target MsSQL table. It takes precedence over 
mssql_table. (templated)
     :param mssql_conn_id: reference to a specific mssql hook
-    :param database: name of database which overwrite defined one in connection
-    :param replace: Whether to replace instead of insert
-    :param batch_size: The number of rows to take in each batch
-    :param location: The location used for the operation.
-    :param impersonation_chain: Optional service account to impersonate using 
short-term
-        credentials, or chained list of accounts required to get the 
access_token
-        of the last account in the list, which will be impersonated in the 
request.
-        If set as a string, the account must grant the originating account
-        the Service Account Token Creator IAM role.
-        If set as a sequence, the identities from the list must grant
-        Service Account Token Creator IAM role to the directly preceding 
identity, with first
-        account from the list granting this role to the originating account 
(templated).
     """
 
-    template_fields: Sequence[str] = ("source_project_dataset_table", 
"mssql_table", "impersonation_chain")
+    template_fields: Sequence[str] = 
tuple(BigQueryToSqlBaseOperator.template_fields) + (
+        "source_project_dataset_table",
+    )
     operator_extra_links = (BigQueryTableLink(),)
 
     def __init__(
         self,
         *,
         source_project_dataset_table: str,
-        mssql_table: str,
-        selected_fields: list[str] | str | None = None,
-        gcp_conn_id: str = "google_cloud_default",
+        mssql_table: str | None = None,
+        target_table_name: str | None = None,
         mssql_conn_id: str = "mssql_default",
-        database: str | None = None,
-        replace: bool = False,
-        batch_size: int = 1000,
-        location: str | None = None,
-        impersonation_chain: str | Sequence[str] | None = None,
         **kwargs,
     ) -> None:
-        super().__init__(**kwargs)
-        self.selected_fields = selected_fields
-        self.gcp_conn_id = gcp_conn_id
-        self.mssql_conn_id = mssql_conn_id
-        self.database = database
-        self.mssql_table = mssql_table
-        self.replace = replace
-        self.batch_size = batch_size
-        self.location = location
-        self.impersonation_chain = impersonation_chain
+        if mssql_table is not None:
+            warnings.warn(
+                # fmt: off
+                "The `mssql_table` parameter has been deprecated. "
+                "Use `target_table_name` instead.",
+                # fmt: on
+                AirflowProviderDeprecationWarning,
+            )
+
+            if target_table_name is not None:
+                raise ValueError(
+                    f"Cannot set both arguments: mssql_table={mssql_table!r} 
and "
+                    f"target_table_name={target_table_name!r}."
+                )
+
+            target_table_name = mssql_table
+
         try:
-            _, self.dataset_id, self.table_id = 
source_project_dataset_table.split(".")
+            _, dataset_id, table_id = source_project_dataset_table.split(".")
         except ValueError:
             raise ValueError(
                 f"Could not parse {source_project_dataset_table} as 
<project>.<dataset>.<table>"
             ) from None
+        super().__init__(
+            target_table_name=target_table_name,
+            dataset_table=f"{dataset_id}.{table_id}",
+            **kwargs,
+        )
+        self.mssql_conn_id = mssql_conn_id
         self.source_project_dataset_table = source_project_dataset_table
 
-    def execute(self, context: Context) -> None:
-        big_query_hook = BigQueryHook(
-            gcp_conn_id=self.gcp_conn_id,
-            location=self.location,
-            impersonation_chain=self.impersonation_chain,
-        )
+    def get_sql_hook(self) -> MsSqlHook:
+        return MsSqlHook(schema=self.database, 
mysql_conn_id=self.mssql_conn_id)
+
+    def persist_links(self, context: Context) -> None:
         project_id, dataset_id, table_id = 
self.source_project_dataset_table.split(".")
         BigQueryTableLink.persist(
             context=context,
@@ -127,18 +103,3 @@ class BigQueryToMsSqlOperator(BaseOperator):
             project_id=project_id,
             table_id=table_id,
         )
-        mssql_hook = MsSqlHook(mssql_conn_id=self.mssql_conn_id, 
schema=self.database)
-        for rows in bigquery_get_data(
-            self.log,
-            self.dataset_id,
-            self.table_id,
-            big_query_hook,
-            self.batch_size,
-            self.selected_fields,
-        ):
-            mssql_hook.insert_rows(
-                table=self.mssql_table,
-                rows=rows,
-                target_fields=self.selected_fields,
-                replace=self.replace,
-            )
diff --git a/airflow/providers/google/cloud/transfers/bigquery_to_mysql.py 
b/airflow/providers/google/cloud/transfers/bigquery_to_mysql.py
index e584127c7d..99b01fe9e8 100644
--- a/airflow/providers/google/cloud/transfers/bigquery_to_mysql.py
+++ b/airflow/providers/google/cloud/transfers/bigquery_to_mysql.py
@@ -18,18 +18,15 @@
 """This module contains Google BigQuery to MySQL operator."""
 from __future__ import annotations
 
-from typing import TYPE_CHECKING, Sequence
+import warnings
+from typing import Sequence
 
-from airflow.models import BaseOperator
-from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
-from airflow.providers.google.cloud.utils.bigquery_get_data import 
bigquery_get_data
+from airflow.exceptions import AirflowProviderDeprecationWarning
+from airflow.providers.google.cloud.transfers.bigquery_to_sql import 
BigQueryToSqlBaseOperator
 from airflow.providers.mysql.hooks.mysql import MySqlHook
 
-if TYPE_CHECKING:
-    from airflow.utils.context import Context
 
-
-class BigQueryToMySqlOperator(BaseOperator):
+class BigQueryToMySqlOperator(BigQueryToSqlBaseOperator):
     """
     Fetches the data from a BigQuery table (alternatively fetch data for 
selected columns)
     and insert that data into a MySQL table.
@@ -38,100 +35,41 @@ class BigQueryToMySqlOperator(BaseOperator):
         For more information on how to use this operator, take a look at the 
guide:
         :ref:`howto/operator:BigQueryToMySqlOperator`
 
-    .. note::
-        If you pass fields to ``selected_fields`` which are in different order 
than the
-        order of columns already in
-        BQ table, the data will still be in the order of BQ table.
-        For example if the BQ table has 3 columns as
-        ``[A,B,C]`` and you pass 'B,A' in the ``selected_fields``
-        the data would still be of the form ``'A,B'`` and passed through this 
form
-        to MySQL
-
-    **Example**: ::
-
-       # [START howto_operator_bigquery_to_mysql]
-       transfer_data = BigQueryToMySqlOperator(
-            task_id='task_id',
-            dataset_table='origin_bq_table',
-            mysql_table='dest_table_name',
-            replace=True,
-        )
-        # [END howto_operator_bigquery_to_mysql]
-
-    :param dataset_table: A dotted ``<dataset>.<table>``: the big query table 
of origin
-    :param selected_fields: List of fields to return (comma-separated). If
-        unspecified, all fields are returned.
-    :param gcp_conn_id: reference to a specific Google Cloud hook.
+    :param mysql_table: target MySQL table, use dot notation to target a
+        specific database. It is deprecated: use target_table_name instead. 
(templated)
+    :param target_table_name: target MySQL table. It takes precedence over 
mysql_table. (templated)
     :param mysql_conn_id: Reference to :ref:`mysql connection id 
<howto/connection:mysql>`.
-    :param database: name of database which overwrite defined one in connection
-    :param replace: Whether to replace instead of insert
-    :param batch_size: The number of rows to take in each batch
-    :param location: The location used for the operation.
-    :param impersonation_chain: Optional service account to impersonate using 
short-term
-        credentials, or chained list of accounts required to get the 
access_token
-        of the last account in the list, which will be impersonated in the 
request.
-        If set as a string, the account must grant the originating account
-        the Service Account Token Creator IAM role.
-        If set as a sequence, the identities from the list must grant
-        Service Account Token Creator IAM role to the directly preceding 
identity, with first
-        account from the list granting this role to the originating account 
(templated).
     """
 
-    template_fields: Sequence[str] = (
+    template_fields: Sequence[str] = 
tuple(BigQueryToSqlBaseOperator.template_fields) + (
         "dataset_id",
         "table_id",
-        "mysql_table",
-        "impersonation_chain",
     )
 
     def __init__(
         self,
         *,
-        dataset_table: str,
-        mysql_table: str,
-        selected_fields: list[str] | str | None = None,
-        gcp_conn_id: str = "google_cloud_default",
+        mysql_table: str | None = None,
+        target_table_name: str | None = None,
         mysql_conn_id: str = "mysql_default",
-        database: str | None = None,
-        replace: bool = False,
-        batch_size: int = 1000,
-        location: str | None = None,
-        impersonation_chain: str | Sequence[str] | None = None,
         **kwargs,
     ) -> None:
-        super().__init__(**kwargs)
-        self.selected_fields = selected_fields
-        self.gcp_conn_id = gcp_conn_id
+        if mysql_table is not None:
+            warnings.warn(
+                "The `mysql_table` parameter has been deprecated. Use 
`target_table_name` instead.",
+                AirflowProviderDeprecationWarning,
+            )
+
+            if target_table_name is not None:
+                raise ValueError(
+                    f"Cannot set both arguments: mysql_table={mysql_table!r} 
and "
+                    f"target_table_name={target_table_name!r}."
+                )
+
+            target_table_name = mysql_table
+
+        super().__init__(target_table_name=target_table_name, **kwargs)
         self.mysql_conn_id = mysql_conn_id
-        self.database = database
-        self.mysql_table = mysql_table
-        self.replace = replace
-        self.batch_size = batch_size
-        self.location = location
-        self.impersonation_chain = impersonation_chain
-        try:
-            self.dataset_id, self.table_id = dataset_table.split(".")
-        except ValueError:
-            raise ValueError(f"Could not parse {dataset_table} as 
<dataset>.<table>") from None
 
-    def execute(self, context: Context) -> None:
-        big_query_hook = BigQueryHook(
-            gcp_conn_id=self.gcp_conn_id,
-            location=self.location,
-            impersonation_chain=self.impersonation_chain,
-        )
-        mysql_hook = MySqlHook(schema=self.database, 
mysql_conn_id=self.mysql_conn_id)
-        for rows in bigquery_get_data(
-            self.log,
-            self.dataset_id,
-            self.table_id,
-            big_query_hook,
-            self.batch_size,
-            self.selected_fields,
-        ):
-            mysql_hook.insert_rows(
-                table=self.mysql_table,
-                rows=rows,
-                target_fields=self.selected_fields,
-                replace=self.replace,
-            )
+    def get_sql_hook(self) -> MySqlHook:
+        return MySqlHook(schema=self.database, 
mysql_conn_id=self.mysql_conn_id)
diff --git a/airflow/providers/google/cloud/transfers/bigquery_to_postgres.py 
b/airflow/providers/google/cloud/transfers/bigquery_to_postgres.py
new file mode 100644
index 0000000000..34d6399620
--- /dev/null
+++ b/airflow/providers/google/cloud/transfers/bigquery_to_postgres.py
@@ -0,0 +1,56 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains Google BigQuery to PostgreSQL operator."""
+from __future__ import annotations
+
+from typing import Sequence
+
+from airflow.providers.google.cloud.transfers.bigquery_to_sql import 
BigQueryToSqlBaseOperator
+from airflow.providers.postgres.hooks.postgres import PostgresHook
+
+
+class BigQueryToPostgresOperator(BigQueryToSqlBaseOperator):
+    """
+    Fetches the data from a BigQuery table (alternatively fetch data for 
selected columns)
+    and insert that data into a PostgreSQL table.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:BigQueryToPostgresOperator`
+
+    :param target_table_name: target Postgres table (templated)
+    :param postgres_conn_id: Reference to :ref:`postgres connection id 
<howto/connection:postgres>`.
+    """
+
+    template_fields: Sequence[str] = 
tuple(BigQueryToSqlBaseOperator.template_fields) + (
+        "dataset_id",
+        "table_id",
+    )
+
+    def __init__(
+        self,
+        *,
+        target_table_name: str,
+        postgres_conn_id: str = "postgres_default",
+        **kwargs,
+    ) -> None:
+        super().__init__(target_table_name=target_table_name, **kwargs)
+        self.postgres_conn_id = postgres_conn_id
+
+    def get_sql_hook(self) -> PostgresHook:
+        return PostgresHook(schema=self.database, 
postgres_conn_id=self.postgres_conn_id)
diff --git a/airflow/providers/google/cloud/transfers/bigquery_to_mysql.py 
b/airflow/providers/google/cloud/transfers/bigquery_to_sql.py
similarity index 78%
copy from airflow/providers/google/cloud/transfers/bigquery_to_mysql.py
copy to airflow/providers/google/cloud/transfers/bigquery_to_sql.py
index e584127c7d..0e3d7a2a69 100644
--- a/airflow/providers/google/cloud/transfers/bigquery_to_mysql.py
+++ b/airflow/providers/google/cloud/transfers/bigquery_to_sql.py
@@ -15,28 +15,26 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""This module contains Google BigQuery to MySQL operator."""
+"""Base operator for BigQuery to SQL operators."""
 from __future__ import annotations
 
+import abc
 from typing import TYPE_CHECKING, Sequence
 
 from airflow.models import BaseOperator
+from airflow.providers.common.sql.hooks.sql import DbApiHook
 from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
 from airflow.providers.google.cloud.utils.bigquery_get_data import 
bigquery_get_data
-from airflow.providers.mysql.hooks.mysql import MySqlHook
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
 
 
-class BigQueryToMySqlOperator(BaseOperator):
+class BigQueryToSqlBaseOperator(BaseOperator):
     """
     Fetches the data from a BigQuery table (alternatively fetch data for 
selected columns)
-    and insert that data into a MySQL table.
-
-    .. seealso::
-        For more information on how to use this operator, take a look at the 
guide:
-        :ref:`howto/operator:BigQueryToMySqlOperator`
+    and insert that data into a SQL table. This is a BaseOperator; an abstract 
class. Refer
+    to children classes which are related to specific SQL databases (MySQL, 
MsSQL, Postgres...).
 
     .. note::
         If you pass fields to ``selected_fields`` which are in different order 
than the
@@ -45,24 +43,13 @@ class BigQueryToMySqlOperator(BaseOperator):
         For example if the BQ table has 3 columns as
         ``[A,B,C]`` and you pass 'B,A' in the ``selected_fields``
         the data would still be of the form ``'A,B'`` and passed through this 
form
-        to MySQL
-
-    **Example**: ::
-
-       # [START howto_operator_bigquery_to_mysql]
-       transfer_data = BigQueryToMySqlOperator(
-            task_id='task_id',
-            dataset_table='origin_bq_table',
-            mysql_table='dest_table_name',
-            replace=True,
-        )
-        # [END howto_operator_bigquery_to_mysql]
+        to the SQL database.
 
     :param dataset_table: A dotted ``<dataset>.<table>``: the big query table 
of origin
+    :param target_table_name: target SQL table
     :param selected_fields: List of fields to return (comma-separated). If
         unspecified, all fields are returned.
     :param gcp_conn_id: reference to a specific Google Cloud hook.
-    :param mysql_conn_id: Reference to :ref:`mysql connection id 
<howto/connection:mysql>`.
     :param database: name of database which overwrite defined one in connection
     :param replace: Whether to replace instead of insert
     :param batch_size: The number of rows to take in each batch
@@ -78,9 +65,7 @@ class BigQueryToMySqlOperator(BaseOperator):
     """
 
     template_fields: Sequence[str] = (
-        "dataset_id",
-        "table_id",
-        "mysql_table",
+        "target_table_name",
         "impersonation_chain",
     )
 
@@ -88,10 +73,9 @@ class BigQueryToMySqlOperator(BaseOperator):
         self,
         *,
         dataset_table: str,
-        mysql_table: str,
+        target_table_name: str | None,
         selected_fields: list[str] | str | None = None,
         gcp_conn_id: str = "google_cloud_default",
-        mysql_conn_id: str = "mysql_default",
         database: str | None = None,
         replace: bool = False,
         batch_size: int = 1000,
@@ -102,9 +86,8 @@ class BigQueryToMySqlOperator(BaseOperator):
         super().__init__(**kwargs)
         self.selected_fields = selected_fields
         self.gcp_conn_id = gcp_conn_id
-        self.mysql_conn_id = mysql_conn_id
         self.database = database
-        self.mysql_table = mysql_table
+        self.target_table_name = target_table_name
         self.replace = replace
         self.batch_size = batch_size
         self.location = location
@@ -114,13 +97,21 @@ class BigQueryToMySqlOperator(BaseOperator):
         except ValueError:
             raise ValueError(f"Could not parse {dataset_table} as 
<dataset>.<table>") from None
 
+    @abc.abstractmethod
+    def get_sql_hook(self) -> DbApiHook:
+        """Return a concrete SQL Hook (a PostgresHook for instance)."""
+
+    def persist_links(self, context: Context) -> None:
+        """This function persists the connection to the SQL provider."""
+
     def execute(self, context: Context) -> None:
         big_query_hook = BigQueryHook(
             gcp_conn_id=self.gcp_conn_id,
             location=self.location,
             impersonation_chain=self.impersonation_chain,
         )
-        mysql_hook = MySqlHook(schema=self.database, 
mysql_conn_id=self.mysql_conn_id)
+        self.persist_links(context)
+        sql_hook = self.get_sql_hook()
         for rows in bigquery_get_data(
             self.log,
             self.dataset_id,
@@ -129,8 +120,8 @@ class BigQueryToMySqlOperator(BaseOperator):
             self.batch_size,
             self.selected_fields,
         ):
-            mysql_hook.insert_rows(
-                table=self.mysql_table,
+            sql_hook.insert_rows(
+                table=self.target_table_name,
                 rows=rows,
                 target_fields=self.selected_fields,
                 replace=self.replace,
diff --git a/airflow/providers/google/provider.yaml 
b/airflow/providers/google/provider.yaml
index 1ed1c35c24..384eb47c79 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -936,6 +936,9 @@ transfers:
   - source-integration-name: PostgreSQL
     target-integration-name: Google Cloud Storage (GCS)
     python-module: airflow.providers.google.cloud.transfers.postgres_to_gcs
+  - source-integration-name: Google BigQuery
+    target-integration-name: Common SQL
+    python-module: airflow.providers.google.cloud.transfers.bigquery_to_sql
   - source-integration-name: Google BigQuery
     target-integration-name: MySQL
     how-to-guide: 
/docs/apache-airflow-providers-google/operators/transfer/bigquery_to_mysql.rst
@@ -944,6 +947,10 @@ transfers:
     target-integration-name: Microsoft SQL Server (MSSQL)
     how-to-guide: 
/docs/apache-airflow-providers-google/operators/transfer/bigquery_to_mssql.rst
     python-module: airflow.providers.google.cloud.transfers.bigquery_to_mssql
+  - source-integration-name: Google BigQuery
+    target-integration-name: PostgreSQL
+    how-to-guide: 
/docs/apache-airflow-providers-google/operators/transfer/bigquery_to_postgres.rst
+    python-module: 
airflow.providers.google.cloud.transfers.bigquery_to_postgres
   - source-integration-name: Google Cloud Storage (GCS)
     target-integration-name: Google BigQuery
     how-to-guide: 
/docs/apache-airflow-providers-google/operators/transfer/gcs_to_bigquery.rst
diff --git 
a/docs/apache-airflow-providers-google/operators/transfer/bigquery_to_mysql.rst 
b/docs/apache-airflow-providers-google/operators/transfer/bigquery_to_mysql.rst
index 36c9b6b928..b113b56087 100644
--- 
a/docs/apache-airflow-providers-google/operators/transfer/bigquery_to_mysql.rst
+++ 
b/docs/apache-airflow-providers-google/operators/transfer/bigquery_to_mysql.rst
@@ -51,7 +51,7 @@ Transferring data
 
 The following Operator copies data from a BigQuery table to MySQL.
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/transfers/bigquery_to_mysql.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/bigquery/example_bigquery_to_mysql.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_bigquery_to_mysql]
diff --git 
a/docs/apache-airflow-providers-google/operators/transfer/bigquery_to_mysql.rst 
b/docs/apache-airflow-providers-google/operators/transfer/bigquery_to_postgres.rst
similarity index 67%
copy from 
docs/apache-airflow-providers-google/operators/transfer/bigquery_to_mysql.rst
copy to 
docs/apache-airflow-providers-google/operators/transfer/bigquery_to_postgres.rst
index 36c9b6b928..d19a28c8ea 100644
--- 
a/docs/apache-airflow-providers-google/operators/transfer/bigquery_to_mysql.rst
+++ 
b/docs/apache-airflow-providers-google/operators/transfer/bigquery_to_postgres.rst
@@ -16,13 +16,13 @@
     under the License.
 
 
-Google Cloud BigQuery Transfer Operator to MySQL
-================================================
+Google Cloud BigQuery Transfer Operator to Postgres
+===================================================
 
 `Google Cloud BigQuery <https://cloud.google.com/bigquery>`__ is Google 
Cloud's serverless
 data warehouse offering.
-`MySQL <https://www.mysql.com/>`__ is an open-source relational database 
management system.
-This operator can be used to copy data from a BigQuery table to MySQL.
+`PostgreSQL <https://www.postgresql.org/>`__ is an open-source relational 
database management system.
+This operator can be used to copy data from a BigQuery table to PostgreSQL.
 
 
 Prerequisite Tasks
@@ -30,16 +30,16 @@ Prerequisite Tasks
 
 .. include::/operators/_partials/prerequisite_tasks.rst
 
-.. _howto/operator:BigQueryToMySqlOperator:
+.. _howto/operator:BigQueryToPostgresOperator:
 
 Operator
 ^^^^^^^^
 
-Copying data from one BigQuery table to another is performed with the
-:class:`~airflow.providers.google.cloud.transfers.bigquery_to_mysql.BigQueryToMySqlOperator`
 operator.
+Copying data from BigQuery table to Postgres table is performed with the
+:class:`~airflow.providers.google.cloud.transfers.bigquery_to_postgres.BigQueryToPostgresOperator`
 operator.
 
 Use :ref:`Jinja templating <concepts:jinja-templating>` with
-:template-fields:`airflow.providers.google.cloud.transfers.bigquery_to_mysql.BigQueryToMySqlOperator`
+:template-fields:`airflow.providers.google.cloud.transfers.bigquery_to_postgres.BigQueryToPostgresOperator`
 to define values dynamically.
 
 You may use the parameter ``selected_fields`` to limit the fields to be copied 
(all fields by default),
@@ -49,13 +49,13 @@ For more information, please refer to the links above.
 Transferring data
 -----------------
 
-The following Operator copies data from a BigQuery table to MySQL.
+The following Operator copies data from a BigQuery table to PostgreSQL.
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/transfers/bigquery_to_mysql.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/bigquery/example_bigquery_to_postgres.py
     :language: python
     :dedent: 4
-    :start-after: [START howto_operator_bigquery_to_mysql]
-    :end-before: [END howto_operator_bigquery_to_mysql]
+    :start-after: [START howto_operator_bigquery_to_postgres]
+    :end-before: [END howto_operator_bigquery_to_postgres]
 
 
 Reference
@@ -64,4 +64,4 @@ Reference
 For further information, look at:
 
 * `Google Cloud BigQuery Documentation 
<https://cloud.google.com/bigquery/docs/>`__
-* `MySQL Documentation <https://dev.mysql.com/doc/>`__
+* `PostgreSQL Documentation <https://www.postgresql.org/docs/>`__
diff --git a/tests/always/test_project_structure.py 
b/tests/always/test_project_structure.py
index be54136507..9f0d36b4e2 100644
--- a/tests/always/test_project_structure.py
+++ b/tests/always/test_project_structure.py
@@ -274,6 +274,7 @@ class 
TestGoogleProviderProjectStructure(ExampleCoverageTest, AssetsCoverageTest
 
     BASE_CLASSES = {
         
"airflow.providers.google.cloud.operators.compute.ComputeEngineBaseOperator",
+        
"airflow.providers.google.cloud.transfers.bigquery_to_sql.BigQueryToSqlBaseOperator",
         
"airflow.providers.google.cloud.operators.cloud_sql.CloudSQLBaseOperator",
         
"airflow.providers.google.cloud.operators.dataproc.DataprocJobBaseOperator",
         
"airflow.providers.google.cloud.operators.vertex_ai.custom_job.CustomTrainingJobBaseOperator",
@@ -285,7 +286,6 @@ class 
TestGoogleProviderProjectStructure(ExampleCoverageTest, AssetsCoverageTest
         
"airflow.providers.google.cloud.operators.dlp.CloudDLPRedactImageOperator",
         
"airflow.providers.google.cloud.transfers.cassandra_to_gcs.CassandraToGCSOperator",
         
"airflow.providers.google.cloud.transfers.adls_to_gcs.ADLSToGCSOperator",
-        
"airflow.providers.google.cloud.transfers.bigquery_to_mysql.BigQueryToMySqlOperator",
         
"airflow.providers.google.cloud.transfers.sql_to_gcs.BaseSQLToGCSOperator",
         
"airflow.providers.google.cloud.operators.vertex_ai.endpoint_service.GetEndpointOperator",
         
"airflow.providers.google.cloud.operators.vertex_ai.auto_ml.AutoMLTrainingJobBaseOperator",
diff --git a/tests/providers/google/cloud/transfers/test_bigquery_to_mssql.py 
b/tests/providers/google/cloud/transfers/test_bigquery_to_mssql.py
index 1fd5105767..0e924c080a 100644
--- a/tests/providers/google/cloud/transfers/test_bigquery_to_mssql.py
+++ b/tests/providers/google/cloud/transfers/test_bigquery_to_mssql.py
@@ -36,18 +36,17 @@ TEST_DAG_ID = "test-bigquery-operators"
 
 @pytest.mark.backend("mssql")
 class TestBigQueryToMsSqlOperator:
-    
@mock.patch("airflow.providers.google.cloud.transfers.bigquery_to_mssql.BigQueryHook")
+    
@mock.patch("airflow.providers.google.cloud.transfers.bigquery_to_sql.BigQueryHook")
     def test_execute_good_request_to_bq(self, mock_hook):
         destination_table = "table"
         operator = BigQueryToMsSqlOperator(
             task_id=TASK_ID,
             
source_project_dataset_table=f"{TEST_PROJECT_ID}.{TEST_DATASET}.{TEST_TABLE_ID}",
-            mssql_table=destination_table,
+            target_table_name=destination_table,
             replace=False,
         )
 
         operator.execute(context=mock.MagicMock())
-        # fmt: off
         mock_hook.return_value.list_rows.assert_called_once_with(
             dataset_id=TEST_DATASET,
             table_id=TEST_TABLE_ID,
@@ -55,4 +54,3 @@ class TestBigQueryToMsSqlOperator:
             selected_fields=None,
             start_index=0,
         )
-        # fmt: on
diff --git a/tests/providers/google/cloud/transfers/test_bigquery_to_mysql.py 
b/tests/providers/google/cloud/transfers/test_bigquery_to_mysql.py
index 720961d3c4..3e24bef38d 100644
--- a/tests/providers/google/cloud/transfers/test_bigquery_to_mysql.py
+++ b/tests/providers/google/cloud/transfers/test_bigquery_to_mysql.py
@@ -28,18 +28,17 @@ TEST_DAG_ID = "test-bigquery-operators"
 
 
 class TestBigQueryToMySqlOperator:
-    
@mock.patch("airflow.providers.google.cloud.transfers.bigquery_to_mysql.BigQueryHook")
+    
@mock.patch("airflow.providers.google.cloud.transfers.bigquery_to_sql.BigQueryHook")
     def test_execute_good_request_to_bq(self, mock_hook):
         destination_table = "table"
         operator = BigQueryToMySqlOperator(
             task_id=TASK_ID,
             dataset_table=f"{TEST_DATASET}.{TEST_TABLE_ID}",
-            mysql_table=destination_table,
+            target_table_name=destination_table,
             replace=False,
         )
 
         operator.execute(None)
-        # fmt: off
         mock_hook.return_value.list_rows.assert_called_once_with(
             dataset_id=TEST_DATASET,
             table_id=TEST_TABLE_ID,
@@ -47,4 +46,3 @@ class TestBigQueryToMySqlOperator:
             selected_fields=None,
             start_index=0,
         )
-        # fmt: on
diff --git a/tests/providers/google/cloud/transfers/test_bigquery_to_mysql.py 
b/tests/providers/google/cloud/transfers/test_bigquery_to_postgres.py
similarity index 82%
copy from tests/providers/google/cloud/transfers/test_bigquery_to_mysql.py
copy to tests/providers/google/cloud/transfers/test_bigquery_to_postgres.py
index 720961d3c4..fc492f7ea6 100644
--- a/tests/providers/google/cloud/transfers/test_bigquery_to_mysql.py
+++ b/tests/providers/google/cloud/transfers/test_bigquery_to_postgres.py
@@ -19,7 +19,7 @@ from __future__ import annotations
 
 from unittest import mock
 
-from airflow.providers.google.cloud.transfers.bigquery_to_mysql import 
BigQueryToMySqlOperator
+from airflow.providers.google.cloud.transfers.bigquery_to_postgres import 
BigQueryToPostgresOperator
 
 TASK_ID = "test-bq-create-table-operator"
 TEST_DATASET = "test-dataset"
@@ -27,19 +27,18 @@ TEST_TABLE_ID = "test-table-id"
 TEST_DAG_ID = "test-bigquery-operators"
 
 
-class TestBigQueryToMySqlOperator:
-    
@mock.patch("airflow.providers.google.cloud.transfers.bigquery_to_mysql.BigQueryHook")
+class TestBigQueryToPostgresOperator:
+    
@mock.patch("airflow.providers.google.cloud.transfers.bigquery_to_sql.BigQueryHook")
     def test_execute_good_request_to_bq(self, mock_hook):
         destination_table = "table"
-        operator = BigQueryToMySqlOperator(
+        operator = BigQueryToPostgresOperator(
             task_id=TASK_ID,
             dataset_table=f"{TEST_DATASET}.{TEST_TABLE_ID}",
-            mysql_table=destination_table,
+            target_table_name=destination_table,
             replace=False,
         )
 
-        operator.execute(None)
-        # fmt: off
+        operator.execute(context=mock.MagicMock())
         mock_hook.return_value.list_rows.assert_called_once_with(
             dataset_id=TEST_DATASET,
             table_id=TEST_TABLE_ID,
@@ -47,4 +46,3 @@ class TestBigQueryToMySqlOperator:
             selected_fields=None,
             start_index=0,
         )
-        # fmt: on
diff --git 
a/tests/system/providers/google/cloud/bigquery/example_bigquery_to_mssql.py 
b/tests/system/providers/google/cloud/bigquery/example_bigquery_to_mssql.py
index 9b9a69fc43..b91e5881ac 100644
--- a/tests/system/providers/google/cloud/bigquery/example_bigquery_to_mssql.py
+++ b/tests/system/providers/google/cloud/bigquery/example_bigquery_to_mssql.py
@@ -35,7 +35,7 @@ from airflow.providers.google.cloud.operators.bigquery import 
(
 try:
     from airflow.providers.google.cloud.transfers.bigquery_to_mssql import 
BigQueryToMsSqlOperator
 except ImportError:
-    pytest.skip("MySQL not available", allow_module_level=True)
+    pytest.skip("MsSQL not available", allow_module_level=True)
 
 ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
 PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
@@ -57,7 +57,7 @@ with models.DAG(
     bigquery_to_mssql = BigQueryToMsSqlOperator(
         task_id="bigquery_to_mssql",
         source_project_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.{TABLE}",
-        mssql_table=destination_table,
+        target_table_name=destination_table,
         replace=False,
     )
     # [END howto_operator_bigquery_to_mssql]
diff --git 
a/tests/system/providers/google/cloud/bigquery/example_bigquery_to_mssql.py 
b/tests/system/providers/google/cloud/bigquery/example_bigquery_to_mysql.py
similarity index 83%
copy from 
tests/system/providers/google/cloud/bigquery/example_bigquery_to_mssql.py
copy to 
tests/system/providers/google/cloud/bigquery/example_bigquery_to_mysql.py
index 9b9a69fc43..11b898a569 100644
--- a/tests/system/providers/google/cloud/bigquery/example_bigquery_to_mssql.py
+++ b/tests/system/providers/google/cloud/bigquery/example_bigquery_to_mysql.py
@@ -33,18 +33,17 @@ from airflow.providers.google.cloud.operators.bigquery 
import (
 )
 
 try:
-    from airflow.providers.google.cloud.transfers.bigquery_to_mssql import 
BigQueryToMsSqlOperator
+    from airflow.providers.google.cloud.transfers.bigquery_to_mysql import 
BigQueryToMySqlOperator
 except ImportError:
     pytest.skip("MySQL not available", allow_module_level=True)
 
 ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
-PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
-DAG_ID = "example_bigquery_to_mssql"
+DAG_ID = "example_bigquery_to_mysql"
 
 DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}"
 DATA_EXPORT_BUCKET_NAME = os.environ.get("GCP_BIGQUERY_EXPORT_BUCKET_NAME", 
"INVALID BUCKET NAME")
 TABLE = "table_42"
-destination_table = "mssql_table_test"
+destination_table = "mysql_table_test"
 
 with models.DAG(
     DAG_ID,
@@ -53,14 +52,14 @@ with models.DAG(
     catchup=False,
     tags=["example", "bigquery"],
 ) as dag:
-    # [START howto_operator_bigquery_to_mssql]
-    bigquery_to_mssql = BigQueryToMsSqlOperator(
-        task_id="bigquery_to_mssql",
-        source_project_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.{TABLE}",
-        mssql_table=destination_table,
+    # [START howto_operator_bigquery_to_mysql]
+    bigquery_to_mysql = BigQueryToMySqlOperator(
+        task_id="bigquery_to_mysql",
+        dataset_table=f"{DATASET_NAME}.{TABLE}",
+        target_table_name=destination_table,
         replace=False,
     )
-    # [END howto_operator_bigquery_to_mssql]
+    # [END howto_operator_bigquery_to_mysql]
 
     create_dataset = 
BigQueryCreateEmptyDatasetOperator(task_id="create_dataset", 
dataset_id=DATASET_NAME)
 
@@ -83,7 +82,7 @@ with models.DAG(
         create_dataset
         >> create_table
         # TEST BODY
-        >> bigquery_to_mssql
+        >> bigquery_to_mysql
         # TEST TEARDOWN
         >> delete_dataset
     )
diff --git 
a/tests/system/providers/google/cloud/bigquery/example_bigquery_to_mssql.py 
b/tests/system/providers/google/cloud/bigquery/example_bigquery_to_postgres.py
similarity index 81%
copy from 
tests/system/providers/google/cloud/bigquery/example_bigquery_to_mssql.py
copy to 
tests/system/providers/google/cloud/bigquery/example_bigquery_to_postgres.py
index 9b9a69fc43..80aa19f99c 100644
--- a/tests/system/providers/google/cloud/bigquery/example_bigquery_to_mssql.py
+++ 
b/tests/system/providers/google/cloud/bigquery/example_bigquery_to_postgres.py
@@ -33,18 +33,17 @@ from airflow.providers.google.cloud.operators.bigquery 
import (
 )
 
 try:
-    from airflow.providers.google.cloud.transfers.bigquery_to_mssql import 
BigQueryToMsSqlOperator
+    from airflow.providers.google.cloud.transfers.bigquery_to_postgres import 
BigQueryToPostgresOperator
 except ImportError:
-    pytest.skip("MySQL not available", allow_module_level=True)
+    pytest.skip("PostgreSQL not available", allow_module_level=True)
 
 ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
-PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
-DAG_ID = "example_bigquery_to_mssql"
+DAG_ID = "example_bigquery_to_postgres"
 
 DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}"
 DATA_EXPORT_BUCKET_NAME = os.environ.get("GCP_BIGQUERY_EXPORT_BUCKET_NAME", 
"INVALID BUCKET NAME")
 TABLE = "table_42"
-destination_table = "mssql_table_test"
+destination_table = "postgres_table_test"
 
 with models.DAG(
     DAG_ID,
@@ -53,14 +52,14 @@ with models.DAG(
     catchup=False,
     tags=["example", "bigquery"],
 ) as dag:
-    # [START howto_operator_bigquery_to_mssql]
-    bigquery_to_mssql = BigQueryToMsSqlOperator(
-        task_id="bigquery_to_mssql",
-        source_project_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.{TABLE}",
-        mssql_table=destination_table,
+    # [START howto_operator_bigquery_to_postgres]
+    bigquery_to_postgres = BigQueryToPostgresOperator(
+        task_id="bigquery_to_postgres",
+        dataset_table=f"{DATASET_NAME}.{TABLE}",
+        target_table_name=destination_table,
         replace=False,
     )
-    # [END howto_operator_bigquery_to_mssql]
+    # [END howto_operator_bigquery_to_postgres]
 
     create_dataset = 
BigQueryCreateEmptyDatasetOperator(task_id="create_dataset", 
dataset_id=DATASET_NAME)
 
@@ -83,7 +82,7 @@ with models.DAG(
         create_dataset
         >> create_table
         # TEST BODY
-        >> bigquery_to_mssql
+        >> bigquery_to_postgres
         # TEST TEARDOWN
         >> delete_dataset
     )


Reply via email to