This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new cf1e26b046 Add BigQueryToPostgresOperator (#30658)
cf1e26b046 is described below
commit cf1e26b04669ad2b232618fee623e6bc7797a13a
Author: Jean-Baptiste Braun <[email protected]>
AuthorDate: Tue May 16 13:20:57 2023 +0200
Add BigQueryToPostgresOperator (#30658)
Co-authored-by: eladkal <[email protected]>
---
.../google/cloud/transfers/bigquery_to_mssql.py | 117 +++++++-------------
.../google/cloud/transfers/bigquery_to_mysql.py | 118 +++++----------------
.../google/cloud/transfers/bigquery_to_postgres.py | 56 ++++++++++
.../{bigquery_to_mysql.py => bigquery_to_sql.py} | 53 ++++-----
airflow/providers/google/provider.yaml | 7 ++
.../operators/transfer/bigquery_to_mysql.rst | 2 +-
...query_to_mysql.rst => bigquery_to_postgres.rst} | 26 ++---
tests/always/test_project_structure.py | 2 +-
.../cloud/transfers/test_bigquery_to_mssql.py | 6 +-
.../cloud/transfers/test_bigquery_to_mysql.py | 6 +-
...ry_to_mysql.py => test_bigquery_to_postgres.py} | 14 ++-
.../cloud/bigquery/example_bigquery_to_mssql.py | 4 +-
...ry_to_mssql.py => example_bigquery_to_mysql.py} | 21 ++--
...to_mssql.py => example_bigquery_to_postgres.py} | 23 ++--
14 files changed, 200 insertions(+), 255 deletions(-)
diff --git a/airflow/providers/google/cloud/transfers/bigquery_to_mssql.py
b/airflow/providers/google/cloud/transfers/bigquery_to_mssql.py
index 095eb874c7..0c7e58a458 100644
--- a/airflow/providers/google/cloud/transfers/bigquery_to_mssql.py
+++ b/airflow/providers/google/cloud/transfers/bigquery_to_mssql.py
@@ -18,19 +18,19 @@
"""This module contains Google BigQuery to MSSQL operator."""
from __future__ import annotations
+import warnings
from typing import TYPE_CHECKING, Sequence
-from airflow.models import BaseOperator
-from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
+from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.providers.google.cloud.links.bigquery import BigQueryTableLink
-from airflow.providers.google.cloud.utils.bigquery_get_data import
bigquery_get_data
+from airflow.providers.google.cloud.transfers.bigquery_to_sql import
BigQueryToSqlBaseOperator
from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook
if TYPE_CHECKING:
from airflow.utils.context import Context
-class BigQueryToMsSqlOperator(BaseOperator):
+class BigQueryToMsSqlOperator(BigQueryToSqlBaseOperator):
"""
Fetches the data from a BigQuery table (alternatively fetch data for
selected columns)
and insert that data into a MSSQL table.
@@ -39,86 +39,62 @@ class BigQueryToMsSqlOperator(BaseOperator):
For more information on how to use this operator, take a look at the
guide:
:ref:`howto/operator:BigQueryToMsSqlOperator`
- .. note::
- If you pass fields to ``selected_fields`` which are in different order
than the
- order of columns already in
- BQ table, the data will still be in the order of BQ table.
- For example if the BQ table has 3 columns as
- ``[A,B,C]`` and you pass 'B,A' in the ``selected_fields``
- the data would still be of the form ``'A,B'`` and passed through this
form
- to MSSQL
-
- **Example**: ::
-
- transfer_data = BigQueryToMsSqlOperator(
- task_id='task_id',
- source_project_dataset_table='my-project.mydataset.mytable',
- mssql_table='dest_table_name',
- replace=True,
- )
-
:param source_project_dataset_table: A dotted
``<project>.<dataset>.<table>``:
the big query table of origin
- :param selected_fields: List of fields to return (comma-separated). If
- unspecified, all fields are returned.
- :param gcp_conn_id: reference to a specific Google Cloud hook.
+ :param mssql_table: target MsSQL table. It is deprecated: use
target_table_name instead. (templated)
+ :param target_table_name: target MsSQL table. It takes precedence over
mssql_table. (templated)
:param mssql_conn_id: reference to a specific mssql hook
- :param database: name of database which overwrite defined one in connection
- :param replace: Whether to replace instead of insert
- :param batch_size: The number of rows to take in each batch
- :param location: The location used for the operation.
- :param impersonation_chain: Optional service account to impersonate using
short-term
- credentials, or chained list of accounts required to get the
access_token
- of the last account in the list, which will be impersonated in the
request.
- If set as a string, the account must grant the originating account
- the Service Account Token Creator IAM role.
- If set as a sequence, the identities from the list must grant
- Service Account Token Creator IAM role to the directly preceding
identity, with first
- account from the list granting this role to the originating account
(templated).
"""
- template_fields: Sequence[str] = ("source_project_dataset_table",
"mssql_table", "impersonation_chain")
+ template_fields: Sequence[str] =
tuple(BigQueryToSqlBaseOperator.template_fields) + (
+ "source_project_dataset_table",
+ )
operator_extra_links = (BigQueryTableLink(),)
def __init__(
self,
*,
source_project_dataset_table: str,
- mssql_table: str,
- selected_fields: list[str] | str | None = None,
- gcp_conn_id: str = "google_cloud_default",
+ mssql_table: str | None = None,
+ target_table_name: str | None = None,
mssql_conn_id: str = "mssql_default",
- database: str | None = None,
- replace: bool = False,
- batch_size: int = 1000,
- location: str | None = None,
- impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
- super().__init__(**kwargs)
- self.selected_fields = selected_fields
- self.gcp_conn_id = gcp_conn_id
- self.mssql_conn_id = mssql_conn_id
- self.database = database
- self.mssql_table = mssql_table
- self.replace = replace
- self.batch_size = batch_size
- self.location = location
- self.impersonation_chain = impersonation_chain
+ if mssql_table is not None:
+ warnings.warn(
+ # fmt: off
+ "The `mssql_table` parameter has been deprecated. "
+ "Use `target_table_name` instead.",
+ # fmt: on
+ AirflowProviderDeprecationWarning,
+ )
+
+ if target_table_name is not None:
+ raise ValueError(
+ f"Cannot set both arguments: mssql_table={mssql_table!r}
and "
+ f"target_table_name={target_table_name!r}."
+ )
+
+ target_table_name = mssql_table
+
try:
- _, self.dataset_id, self.table_id =
source_project_dataset_table.split(".")
+ _, dataset_id, table_id = source_project_dataset_table.split(".")
except ValueError:
raise ValueError(
f"Could not parse {source_project_dataset_table} as
<project>.<dataset>.<table>"
) from None
+ super().__init__(
+ target_table_name=target_table_name,
+ dataset_table=f"{dataset_id}.{table_id}",
+ **kwargs,
+ )
+ self.mssql_conn_id = mssql_conn_id
self.source_project_dataset_table = source_project_dataset_table
- def execute(self, context: Context) -> None:
- big_query_hook = BigQueryHook(
- gcp_conn_id=self.gcp_conn_id,
- location=self.location,
- impersonation_chain=self.impersonation_chain,
- )
+ def get_sql_hook(self) -> MsSqlHook:
+ return MsSqlHook(schema=self.database,
mysql_conn_id=self.mssql_conn_id)
+
+ def persist_links(self, context: Context) -> None:
project_id, dataset_id, table_id =
self.source_project_dataset_table.split(".")
BigQueryTableLink.persist(
context=context,
@@ -127,18 +103,3 @@ class BigQueryToMsSqlOperator(BaseOperator):
project_id=project_id,
table_id=table_id,
)
- mssql_hook = MsSqlHook(mssql_conn_id=self.mssql_conn_id,
schema=self.database)
- for rows in bigquery_get_data(
- self.log,
- self.dataset_id,
- self.table_id,
- big_query_hook,
- self.batch_size,
- self.selected_fields,
- ):
- mssql_hook.insert_rows(
- table=self.mssql_table,
- rows=rows,
- target_fields=self.selected_fields,
- replace=self.replace,
- )
diff --git a/airflow/providers/google/cloud/transfers/bigquery_to_mysql.py
b/airflow/providers/google/cloud/transfers/bigquery_to_mysql.py
index e584127c7d..99b01fe9e8 100644
--- a/airflow/providers/google/cloud/transfers/bigquery_to_mysql.py
+++ b/airflow/providers/google/cloud/transfers/bigquery_to_mysql.py
@@ -18,18 +18,15 @@
"""This module contains Google BigQuery to MySQL operator."""
from __future__ import annotations
-from typing import TYPE_CHECKING, Sequence
+import warnings
+from typing import Sequence
-from airflow.models import BaseOperator
-from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
-from airflow.providers.google.cloud.utils.bigquery_get_data import
bigquery_get_data
+from airflow.exceptions import AirflowProviderDeprecationWarning
+from airflow.providers.google.cloud.transfers.bigquery_to_sql import
BigQueryToSqlBaseOperator
from airflow.providers.mysql.hooks.mysql import MySqlHook
-if TYPE_CHECKING:
- from airflow.utils.context import Context
-
-class BigQueryToMySqlOperator(BaseOperator):
+class BigQueryToMySqlOperator(BigQueryToSqlBaseOperator):
"""
Fetches the data from a BigQuery table (alternatively fetch data for
selected columns)
and insert that data into a MySQL table.
@@ -38,100 +35,41 @@ class BigQueryToMySqlOperator(BaseOperator):
For more information on how to use this operator, take a look at the
guide:
:ref:`howto/operator:BigQueryToMySqlOperator`
- .. note::
- If you pass fields to ``selected_fields`` which are in different order
than the
- order of columns already in
- BQ table, the data will still be in the order of BQ table.
- For example if the BQ table has 3 columns as
- ``[A,B,C]`` and you pass 'B,A' in the ``selected_fields``
- the data would still be of the form ``'A,B'`` and passed through this
form
- to MySQL
-
- **Example**: ::
-
- # [START howto_operator_bigquery_to_mysql]
- transfer_data = BigQueryToMySqlOperator(
- task_id='task_id',
- dataset_table='origin_bq_table',
- mysql_table='dest_table_name',
- replace=True,
- )
- # [END howto_operator_bigquery_to_mysql]
-
- :param dataset_table: A dotted ``<dataset>.<table>``: the big query table
of origin
- :param selected_fields: List of fields to return (comma-separated). If
- unspecified, all fields are returned.
- :param gcp_conn_id: reference to a specific Google Cloud hook.
+ :param mysql_table: target MySQL table, use dot notation to target a
+ specific database. It is deprecated: use target_table_name instead.
(templated)
+ :param target_table_name: target MySQL table. It takes precedence over
mysql_table. (templated)
:param mysql_conn_id: Reference to :ref:`mysql connection id
<howto/connection:mysql>`.
- :param database: name of database which overwrite defined one in connection
- :param replace: Whether to replace instead of insert
- :param batch_size: The number of rows to take in each batch
- :param location: The location used for the operation.
- :param impersonation_chain: Optional service account to impersonate using
short-term
- credentials, or chained list of accounts required to get the
access_token
- of the last account in the list, which will be impersonated in the
request.
- If set as a string, the account must grant the originating account
- the Service Account Token Creator IAM role.
- If set as a sequence, the identities from the list must grant
- Service Account Token Creator IAM role to the directly preceding
identity, with first
- account from the list granting this role to the originating account
(templated).
"""
- template_fields: Sequence[str] = (
+ template_fields: Sequence[str] =
tuple(BigQueryToSqlBaseOperator.template_fields) + (
"dataset_id",
"table_id",
- "mysql_table",
- "impersonation_chain",
)
def __init__(
self,
*,
- dataset_table: str,
- mysql_table: str,
- selected_fields: list[str] | str | None = None,
- gcp_conn_id: str = "google_cloud_default",
+ mysql_table: str | None = None,
+ target_table_name: str | None = None,
mysql_conn_id: str = "mysql_default",
- database: str | None = None,
- replace: bool = False,
- batch_size: int = 1000,
- location: str | None = None,
- impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
- super().__init__(**kwargs)
- self.selected_fields = selected_fields
- self.gcp_conn_id = gcp_conn_id
+ if mysql_table is not None:
+ warnings.warn(
+ "The `mysql_table` parameter has been deprecated. Use
`target_table_name` instead.",
+ AirflowProviderDeprecationWarning,
+ )
+
+ if target_table_name is not None:
+ raise ValueError(
+ f"Cannot set both arguments: mysql_table={mysql_table!r}
and "
+ f"target_table_name={target_table_name!r}."
+ )
+
+ target_table_name = mysql_table
+
+ super().__init__(target_table_name=target_table_name, **kwargs)
self.mysql_conn_id = mysql_conn_id
- self.database = database
- self.mysql_table = mysql_table
- self.replace = replace
- self.batch_size = batch_size
- self.location = location
- self.impersonation_chain = impersonation_chain
- try:
- self.dataset_id, self.table_id = dataset_table.split(".")
- except ValueError:
- raise ValueError(f"Could not parse {dataset_table} as
<dataset>.<table>") from None
- def execute(self, context: Context) -> None:
- big_query_hook = BigQueryHook(
- gcp_conn_id=self.gcp_conn_id,
- location=self.location,
- impersonation_chain=self.impersonation_chain,
- )
- mysql_hook = MySqlHook(schema=self.database,
mysql_conn_id=self.mysql_conn_id)
- for rows in bigquery_get_data(
- self.log,
- self.dataset_id,
- self.table_id,
- big_query_hook,
- self.batch_size,
- self.selected_fields,
- ):
- mysql_hook.insert_rows(
- table=self.mysql_table,
- rows=rows,
- target_fields=self.selected_fields,
- replace=self.replace,
- )
+ def get_sql_hook(self) -> MySqlHook:
+ return MySqlHook(schema=self.database,
mysql_conn_id=self.mysql_conn_id)
diff --git a/airflow/providers/google/cloud/transfers/bigquery_to_postgres.py
b/airflow/providers/google/cloud/transfers/bigquery_to_postgres.py
new file mode 100644
index 0000000000..34d6399620
--- /dev/null
+++ b/airflow/providers/google/cloud/transfers/bigquery_to_postgres.py
@@ -0,0 +1,56 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains Google BigQuery to PostgreSQL operator."""
+from __future__ import annotations
+
+from typing import Sequence
+
+from airflow.providers.google.cloud.transfers.bigquery_to_sql import
BigQueryToSqlBaseOperator
+from airflow.providers.postgres.hooks.postgres import PostgresHook
+
+
+class BigQueryToPostgresOperator(BigQueryToSqlBaseOperator):
+ """
+ Fetches the data from a BigQuery table (alternatively fetch data for
selected columns)
+ and insert that data into a PostgreSQL table.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:BigQueryToPostgresOperator`
+
+ :param target_table_name: target Postgres table (templated)
+ :param postgres_conn_id: Reference to :ref:`postgres connection id
<howto/connection:postgres>`.
+ """
+
+ template_fields: Sequence[str] =
tuple(BigQueryToSqlBaseOperator.template_fields) + (
+ "dataset_id",
+ "table_id",
+ )
+
+ def __init__(
+ self,
+ *,
+ target_table_name: str,
+ postgres_conn_id: str = "postgres_default",
+ **kwargs,
+ ) -> None:
+ super().__init__(target_table_name=target_table_name, **kwargs)
+ self.postgres_conn_id = postgres_conn_id
+
+ def get_sql_hook(self) -> PostgresHook:
+ return PostgresHook(schema=self.database,
postgres_conn_id=self.postgres_conn_id)
diff --git a/airflow/providers/google/cloud/transfers/bigquery_to_mysql.py
b/airflow/providers/google/cloud/transfers/bigquery_to_sql.py
similarity index 78%
copy from airflow/providers/google/cloud/transfers/bigquery_to_mysql.py
copy to airflow/providers/google/cloud/transfers/bigquery_to_sql.py
index e584127c7d..0e3d7a2a69 100644
--- a/airflow/providers/google/cloud/transfers/bigquery_to_mysql.py
+++ b/airflow/providers/google/cloud/transfers/bigquery_to_sql.py
@@ -15,28 +15,26 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""This module contains Google BigQuery to MySQL operator."""
+"""Base operator for BigQuery to SQL operators."""
from __future__ import annotations
+import abc
from typing import TYPE_CHECKING, Sequence
from airflow.models import BaseOperator
+from airflow.providers.common.sql.hooks.sql import DbApiHook
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.providers.google.cloud.utils.bigquery_get_data import
bigquery_get_data
-from airflow.providers.mysql.hooks.mysql import MySqlHook
if TYPE_CHECKING:
from airflow.utils.context import Context
-class BigQueryToMySqlOperator(BaseOperator):
+class BigQueryToSqlBaseOperator(BaseOperator):
"""
Fetches the data from a BigQuery table (alternatively fetch data for
selected columns)
- and insert that data into a MySQL table.
-
- .. seealso::
- For more information on how to use this operator, take a look at the
guide:
- :ref:`howto/operator:BigQueryToMySqlOperator`
+ and insert that data into a SQL table. This is a BaseOperator; an abstract
class. Refer
+ to children classes which are related to specific SQL databases (MySQL,
MsSQL, Postgres...).
.. note::
If you pass fields to ``selected_fields`` which are in different order
than the
@@ -45,24 +43,13 @@ class BigQueryToMySqlOperator(BaseOperator):
For example if the BQ table has 3 columns as
``[A,B,C]`` and you pass 'B,A' in the ``selected_fields``
the data would still be of the form ``'A,B'`` and passed through this
form
- to MySQL
-
- **Example**: ::
-
- # [START howto_operator_bigquery_to_mysql]
- transfer_data = BigQueryToMySqlOperator(
- task_id='task_id',
- dataset_table='origin_bq_table',
- mysql_table='dest_table_name',
- replace=True,
- )
- # [END howto_operator_bigquery_to_mysql]
+ to the SQL database.
:param dataset_table: A dotted ``<dataset>.<table>``: the big query table
of origin
+ :param target_table_name: target SQL table
:param selected_fields: List of fields to return (comma-separated). If
unspecified, all fields are returned.
:param gcp_conn_id: reference to a specific Google Cloud hook.
- :param mysql_conn_id: Reference to :ref:`mysql connection id
<howto/connection:mysql>`.
:param database: name of database which overwrite defined one in connection
:param replace: Whether to replace instead of insert
:param batch_size: The number of rows to take in each batch
@@ -78,9 +65,7 @@ class BigQueryToMySqlOperator(BaseOperator):
"""
template_fields: Sequence[str] = (
- "dataset_id",
- "table_id",
- "mysql_table",
+ "target_table_name",
"impersonation_chain",
)
@@ -88,10 +73,9 @@ class BigQueryToMySqlOperator(BaseOperator):
self,
*,
dataset_table: str,
- mysql_table: str,
+ target_table_name: str | None,
selected_fields: list[str] | str | None = None,
gcp_conn_id: str = "google_cloud_default",
- mysql_conn_id: str = "mysql_default",
database: str | None = None,
replace: bool = False,
batch_size: int = 1000,
@@ -102,9 +86,8 @@ class BigQueryToMySqlOperator(BaseOperator):
super().__init__(**kwargs)
self.selected_fields = selected_fields
self.gcp_conn_id = gcp_conn_id
- self.mysql_conn_id = mysql_conn_id
self.database = database
- self.mysql_table = mysql_table
+ self.target_table_name = target_table_name
self.replace = replace
self.batch_size = batch_size
self.location = location
@@ -114,13 +97,21 @@ class BigQueryToMySqlOperator(BaseOperator):
except ValueError:
raise ValueError(f"Could not parse {dataset_table} as
<dataset>.<table>") from None
+ @abc.abstractmethod
+ def get_sql_hook(self) -> DbApiHook:
+ """Return a concrete SQL Hook (a PostgresHook for instance)."""
+
+ def persist_links(self, context: Context) -> None:
+ """This function persists the connection to the SQL provider."""
+
def execute(self, context: Context) -> None:
big_query_hook = BigQueryHook(
gcp_conn_id=self.gcp_conn_id,
location=self.location,
impersonation_chain=self.impersonation_chain,
)
- mysql_hook = MySqlHook(schema=self.database,
mysql_conn_id=self.mysql_conn_id)
+ self.persist_links(context)
+ sql_hook = self.get_sql_hook()
for rows in bigquery_get_data(
self.log,
self.dataset_id,
@@ -129,8 +120,8 @@ class BigQueryToMySqlOperator(BaseOperator):
self.batch_size,
self.selected_fields,
):
- mysql_hook.insert_rows(
- table=self.mysql_table,
+ sql_hook.insert_rows(
+ table=self.target_table_name,
rows=rows,
target_fields=self.selected_fields,
replace=self.replace,
diff --git a/airflow/providers/google/provider.yaml
b/airflow/providers/google/provider.yaml
index 1ed1c35c24..384eb47c79 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -936,6 +936,9 @@ transfers:
- source-integration-name: PostgreSQL
target-integration-name: Google Cloud Storage (GCS)
python-module: airflow.providers.google.cloud.transfers.postgres_to_gcs
+ - source-integration-name: Google BigQuery
+ target-integration-name: Common SQL
+ python-module: airflow.providers.google.cloud.transfers.bigquery_to_sql
- source-integration-name: Google BigQuery
target-integration-name: MySQL
how-to-guide:
/docs/apache-airflow-providers-google/operators/transfer/bigquery_to_mysql.rst
@@ -944,6 +947,10 @@ transfers:
target-integration-name: Microsoft SQL Server (MSSQL)
how-to-guide:
/docs/apache-airflow-providers-google/operators/transfer/bigquery_to_mssql.rst
python-module: airflow.providers.google.cloud.transfers.bigquery_to_mssql
+ - source-integration-name: Google BigQuery
+ target-integration-name: PostgreSQL
+ how-to-guide:
/docs/apache-airflow-providers-google/operators/transfer/bigquery_to_postgres.rst
+ python-module:
airflow.providers.google.cloud.transfers.bigquery_to_postgres
- source-integration-name: Google Cloud Storage (GCS)
target-integration-name: Google BigQuery
how-to-guide:
/docs/apache-airflow-providers-google/operators/transfer/gcs_to_bigquery.rst
diff --git
a/docs/apache-airflow-providers-google/operators/transfer/bigquery_to_mysql.rst
b/docs/apache-airflow-providers-google/operators/transfer/bigquery_to_mysql.rst
index 36c9b6b928..b113b56087 100644
---
a/docs/apache-airflow-providers-google/operators/transfer/bigquery_to_mysql.rst
+++
b/docs/apache-airflow-providers-google/operators/transfer/bigquery_to_mysql.rst
@@ -51,7 +51,7 @@ Transferring data
The following Operator copies data from a BigQuery table to MySQL.
-.. exampleinclude::
/../../airflow/providers/google/cloud/transfers/bigquery_to_mysql.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/bigquery/example_bigquery_to_mysql.py
:language: python
:dedent: 4
:start-after: [START howto_operator_bigquery_to_mysql]
diff --git
a/docs/apache-airflow-providers-google/operators/transfer/bigquery_to_mysql.rst
b/docs/apache-airflow-providers-google/operators/transfer/bigquery_to_postgres.rst
similarity index 67%
copy from
docs/apache-airflow-providers-google/operators/transfer/bigquery_to_mysql.rst
copy to
docs/apache-airflow-providers-google/operators/transfer/bigquery_to_postgres.rst
index 36c9b6b928..d19a28c8ea 100644
---
a/docs/apache-airflow-providers-google/operators/transfer/bigquery_to_mysql.rst
+++
b/docs/apache-airflow-providers-google/operators/transfer/bigquery_to_postgres.rst
@@ -16,13 +16,13 @@
under the License.
-Google Cloud BigQuery Transfer Operator to MySQL
-================================================
+Google Cloud BigQuery Transfer Operator to Postgres
+===================================================
`Google Cloud BigQuery <https://cloud.google.com/bigquery>`__ is Google
Cloud's serverless
data warehouse offering.
-`MySQL <https://www.mysql.com/>`__ is an open-source relational database
management system.
-This operator can be used to copy data from a BigQuery table to MySQL.
+`PostgreSQL <https://www.postgresql.org/>`__ is an open-source relational
database management system.
+This operator can be used to copy data from a BigQuery table to PostgreSQL.
Prerequisite Tasks
@@ -30,16 +30,16 @@ Prerequisite Tasks
.. include::/operators/_partials/prerequisite_tasks.rst
-.. _howto/operator:BigQueryToMySqlOperator:
+.. _howto/operator:BigQueryToPostgresOperator:
Operator
^^^^^^^^
-Copying data from one BigQuery table to another is performed with the
-:class:`~airflow.providers.google.cloud.transfers.bigquery_to_mysql.BigQueryToMySqlOperator`
operator.
+Copying data from BigQuery table to Postgres table is performed with the
+:class:`~airflow.providers.google.cloud.transfers.bigquery_to_postgres.BigQueryToPostgresOperator`
operator.
Use :ref:`Jinja templating <concepts:jinja-templating>` with
-:template-fields:`airflow.providers.google.cloud.transfers.bigquery_to_mysql.BigQueryToMySqlOperator`
+:template-fields:`airflow.providers.google.cloud.transfers.bigquery_to_postgres.BigQueryToPostgresOperator`
to define values dynamically.
You may use the parameter ``selected_fields`` to limit the fields to be copied
(all fields by default),
@@ -49,13 +49,13 @@ For more information, please refer to the links above.
Transferring data
-----------------
-The following Operator copies data from a BigQuery table to MySQL.
+The following Operator copies data from a BigQuery table to PostgreSQL.
-.. exampleinclude::
/../../airflow/providers/google/cloud/transfers/bigquery_to_mysql.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/bigquery/example_bigquery_to_postgres.py
:language: python
:dedent: 4
- :start-after: [START howto_operator_bigquery_to_mysql]
- :end-before: [END howto_operator_bigquery_to_mysql]
+ :start-after: [START howto_operator_bigquery_to_postgres]
+ :end-before: [END howto_operator_bigquery_to_postgres]
Reference
@@ -64,4 +64,4 @@ Reference
For further information, look at:
* `Google Cloud BigQuery Documentation
<https://cloud.google.com/bigquery/docs/>`__
-* `MySQL Documentation <https://dev.mysql.com/doc/>`__
+* `PostgreSQL Documentation <https://www.postgresql.org/docs/>`__
diff --git a/tests/always/test_project_structure.py
b/tests/always/test_project_structure.py
index be54136507..9f0d36b4e2 100644
--- a/tests/always/test_project_structure.py
+++ b/tests/always/test_project_structure.py
@@ -274,6 +274,7 @@ class
TestGoogleProviderProjectStructure(ExampleCoverageTest, AssetsCoverageTest
BASE_CLASSES = {
"airflow.providers.google.cloud.operators.compute.ComputeEngineBaseOperator",
+
"airflow.providers.google.cloud.transfers.bigquery_to_sql.BigQueryToSqlBaseOperator",
"airflow.providers.google.cloud.operators.cloud_sql.CloudSQLBaseOperator",
"airflow.providers.google.cloud.operators.dataproc.DataprocJobBaseOperator",
"airflow.providers.google.cloud.operators.vertex_ai.custom_job.CustomTrainingJobBaseOperator",
@@ -285,7 +286,6 @@ class
TestGoogleProviderProjectStructure(ExampleCoverageTest, AssetsCoverageTest
"airflow.providers.google.cloud.operators.dlp.CloudDLPRedactImageOperator",
"airflow.providers.google.cloud.transfers.cassandra_to_gcs.CassandraToGCSOperator",
"airflow.providers.google.cloud.transfers.adls_to_gcs.ADLSToGCSOperator",
-
"airflow.providers.google.cloud.transfers.bigquery_to_mysql.BigQueryToMySqlOperator",
"airflow.providers.google.cloud.transfers.sql_to_gcs.BaseSQLToGCSOperator",
"airflow.providers.google.cloud.operators.vertex_ai.endpoint_service.GetEndpointOperator",
"airflow.providers.google.cloud.operators.vertex_ai.auto_ml.AutoMLTrainingJobBaseOperator",
diff --git a/tests/providers/google/cloud/transfers/test_bigquery_to_mssql.py
b/tests/providers/google/cloud/transfers/test_bigquery_to_mssql.py
index 1fd5105767..0e924c080a 100644
--- a/tests/providers/google/cloud/transfers/test_bigquery_to_mssql.py
+++ b/tests/providers/google/cloud/transfers/test_bigquery_to_mssql.py
@@ -36,18 +36,17 @@ TEST_DAG_ID = "test-bigquery-operators"
@pytest.mark.backend("mssql")
class TestBigQueryToMsSqlOperator:
-
@mock.patch("airflow.providers.google.cloud.transfers.bigquery_to_mssql.BigQueryHook")
+
@mock.patch("airflow.providers.google.cloud.transfers.bigquery_to_sql.BigQueryHook")
def test_execute_good_request_to_bq(self, mock_hook):
destination_table = "table"
operator = BigQueryToMsSqlOperator(
task_id=TASK_ID,
source_project_dataset_table=f"{TEST_PROJECT_ID}.{TEST_DATASET}.{TEST_TABLE_ID}",
- mssql_table=destination_table,
+ target_table_name=destination_table,
replace=False,
)
operator.execute(context=mock.MagicMock())
- # fmt: off
mock_hook.return_value.list_rows.assert_called_once_with(
dataset_id=TEST_DATASET,
table_id=TEST_TABLE_ID,
@@ -55,4 +54,3 @@ class TestBigQueryToMsSqlOperator:
selected_fields=None,
start_index=0,
)
- # fmt: on
diff --git a/tests/providers/google/cloud/transfers/test_bigquery_to_mysql.py
b/tests/providers/google/cloud/transfers/test_bigquery_to_mysql.py
index 720961d3c4..3e24bef38d 100644
--- a/tests/providers/google/cloud/transfers/test_bigquery_to_mysql.py
+++ b/tests/providers/google/cloud/transfers/test_bigquery_to_mysql.py
@@ -28,18 +28,17 @@ TEST_DAG_ID = "test-bigquery-operators"
class TestBigQueryToMySqlOperator:
-
@mock.patch("airflow.providers.google.cloud.transfers.bigquery_to_mysql.BigQueryHook")
+
@mock.patch("airflow.providers.google.cloud.transfers.bigquery_to_sql.BigQueryHook")
def test_execute_good_request_to_bq(self, mock_hook):
destination_table = "table"
operator = BigQueryToMySqlOperator(
task_id=TASK_ID,
dataset_table=f"{TEST_DATASET}.{TEST_TABLE_ID}",
- mysql_table=destination_table,
+ target_table_name=destination_table,
replace=False,
)
operator.execute(None)
- # fmt: off
mock_hook.return_value.list_rows.assert_called_once_with(
dataset_id=TEST_DATASET,
table_id=TEST_TABLE_ID,
@@ -47,4 +46,3 @@ class TestBigQueryToMySqlOperator:
selected_fields=None,
start_index=0,
)
- # fmt: on
diff --git a/tests/providers/google/cloud/transfers/test_bigquery_to_mysql.py
b/tests/providers/google/cloud/transfers/test_bigquery_to_postgres.py
similarity index 82%
copy from tests/providers/google/cloud/transfers/test_bigquery_to_mysql.py
copy to tests/providers/google/cloud/transfers/test_bigquery_to_postgres.py
index 720961d3c4..fc492f7ea6 100644
--- a/tests/providers/google/cloud/transfers/test_bigquery_to_mysql.py
+++ b/tests/providers/google/cloud/transfers/test_bigquery_to_postgres.py
@@ -19,7 +19,7 @@ from __future__ import annotations
from unittest import mock
-from airflow.providers.google.cloud.transfers.bigquery_to_mysql import
BigQueryToMySqlOperator
+from airflow.providers.google.cloud.transfers.bigquery_to_postgres import
BigQueryToPostgresOperator
TASK_ID = "test-bq-create-table-operator"
TEST_DATASET = "test-dataset"
@@ -27,19 +27,18 @@ TEST_TABLE_ID = "test-table-id"
TEST_DAG_ID = "test-bigquery-operators"
-class TestBigQueryToMySqlOperator:
-
@mock.patch("airflow.providers.google.cloud.transfers.bigquery_to_mysql.BigQueryHook")
+class TestBigQueryToPostgresOperator:
+
@mock.patch("airflow.providers.google.cloud.transfers.bigquery_to_sql.BigQueryHook")
def test_execute_good_request_to_bq(self, mock_hook):
destination_table = "table"
- operator = BigQueryToMySqlOperator(
+ operator = BigQueryToPostgresOperator(
task_id=TASK_ID,
dataset_table=f"{TEST_DATASET}.{TEST_TABLE_ID}",
- mysql_table=destination_table,
+ target_table_name=destination_table,
replace=False,
)
- operator.execute(None)
- # fmt: off
+ operator.execute(context=mock.MagicMock())
mock_hook.return_value.list_rows.assert_called_once_with(
dataset_id=TEST_DATASET,
table_id=TEST_TABLE_ID,
@@ -47,4 +46,3 @@ class TestBigQueryToMySqlOperator:
selected_fields=None,
start_index=0,
)
- # fmt: on
diff --git
a/tests/system/providers/google/cloud/bigquery/example_bigquery_to_mssql.py
b/tests/system/providers/google/cloud/bigquery/example_bigquery_to_mssql.py
index 9b9a69fc43..b91e5881ac 100644
--- a/tests/system/providers/google/cloud/bigquery/example_bigquery_to_mssql.py
+++ b/tests/system/providers/google/cloud/bigquery/example_bigquery_to_mssql.py
@@ -35,7 +35,7 @@ from airflow.providers.google.cloud.operators.bigquery import
(
try:
from airflow.providers.google.cloud.transfers.bigquery_to_mssql import
BigQueryToMsSqlOperator
except ImportError:
- pytest.skip("MySQL not available", allow_module_level=True)
+ pytest.skip("MsSQL not available", allow_module_level=True)
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
@@ -57,7 +57,7 @@ with models.DAG(
bigquery_to_mssql = BigQueryToMsSqlOperator(
task_id="bigquery_to_mssql",
source_project_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.{TABLE}",
- mssql_table=destination_table,
+ target_table_name=destination_table,
replace=False,
)
# [END howto_operator_bigquery_to_mssql]
diff --git
a/tests/system/providers/google/cloud/bigquery/example_bigquery_to_mssql.py
b/tests/system/providers/google/cloud/bigquery/example_bigquery_to_mysql.py
similarity index 83%
copy from
tests/system/providers/google/cloud/bigquery/example_bigquery_to_mssql.py
copy to
tests/system/providers/google/cloud/bigquery/example_bigquery_to_mysql.py
index 9b9a69fc43..11b898a569 100644
--- a/tests/system/providers/google/cloud/bigquery/example_bigquery_to_mssql.py
+++ b/tests/system/providers/google/cloud/bigquery/example_bigquery_to_mysql.py
@@ -33,18 +33,17 @@ from airflow.providers.google.cloud.operators.bigquery
import (
)
try:
- from airflow.providers.google.cloud.transfers.bigquery_to_mssql import
BigQueryToMsSqlOperator
+ from airflow.providers.google.cloud.transfers.bigquery_to_mysql import
BigQueryToMySqlOperator
except ImportError:
pytest.skip("MySQL not available", allow_module_level=True)
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
-PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
-DAG_ID = "example_bigquery_to_mssql"
+DAG_ID = "example_bigquery_to_mysql"
DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}"
DATA_EXPORT_BUCKET_NAME = os.environ.get("GCP_BIGQUERY_EXPORT_BUCKET_NAME",
"INVALID BUCKET NAME")
TABLE = "table_42"
-destination_table = "mssql_table_test"
+destination_table = "mysql_table_test"
with models.DAG(
DAG_ID,
@@ -53,14 +52,14 @@ with models.DAG(
catchup=False,
tags=["example", "bigquery"],
) as dag:
- # [START howto_operator_bigquery_to_mssql]
- bigquery_to_mssql = BigQueryToMsSqlOperator(
- task_id="bigquery_to_mssql",
- source_project_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.{TABLE}",
- mssql_table=destination_table,
+ # [START howto_operator_bigquery_to_mysql]
+ bigquery_to_mysql = BigQueryToMySqlOperator(
+ task_id="bigquery_to_mysql",
+ dataset_table=f"{DATASET_NAME}.{TABLE}",
+ target_table_name=destination_table,
replace=False,
)
- # [END howto_operator_bigquery_to_mssql]
+ # [END howto_operator_bigquery_to_mysql]
create_dataset =
BigQueryCreateEmptyDatasetOperator(task_id="create_dataset",
dataset_id=DATASET_NAME)
@@ -83,7 +82,7 @@ with models.DAG(
create_dataset
>> create_table
# TEST BODY
- >> bigquery_to_mssql
+ >> bigquery_to_mysql
# TEST TEARDOWN
>> delete_dataset
)
diff --git
a/tests/system/providers/google/cloud/bigquery/example_bigquery_to_mssql.py
b/tests/system/providers/google/cloud/bigquery/example_bigquery_to_postgres.py
similarity index 81%
copy from
tests/system/providers/google/cloud/bigquery/example_bigquery_to_mssql.py
copy to
tests/system/providers/google/cloud/bigquery/example_bigquery_to_postgres.py
index 9b9a69fc43..80aa19f99c 100644
--- a/tests/system/providers/google/cloud/bigquery/example_bigquery_to_mssql.py
+++
b/tests/system/providers/google/cloud/bigquery/example_bigquery_to_postgres.py
@@ -33,18 +33,17 @@ from airflow.providers.google.cloud.operators.bigquery
import (
)
try:
- from airflow.providers.google.cloud.transfers.bigquery_to_mssql import
BigQueryToMsSqlOperator
+ from airflow.providers.google.cloud.transfers.bigquery_to_postgres import
BigQueryToPostgresOperator
except ImportError:
- pytest.skip("MySQL not available", allow_module_level=True)
+ pytest.skip("PostgreSQL not available", allow_module_level=True)
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
-PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
-DAG_ID = "example_bigquery_to_mssql"
+DAG_ID = "example_bigquery_to_postgres"
DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}"
DATA_EXPORT_BUCKET_NAME = os.environ.get("GCP_BIGQUERY_EXPORT_BUCKET_NAME",
"INVALID BUCKET NAME")
TABLE = "table_42"
-destination_table = "mssql_table_test"
+destination_table = "postgres_table_test"
with models.DAG(
DAG_ID,
@@ -53,14 +52,14 @@ with models.DAG(
catchup=False,
tags=["example", "bigquery"],
) as dag:
- # [START howto_operator_bigquery_to_mssql]
- bigquery_to_mssql = BigQueryToMsSqlOperator(
- task_id="bigquery_to_mssql",
- source_project_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.{TABLE}",
- mssql_table=destination_table,
+ # [START howto_operator_bigquery_to_postgres]
+ bigquery_to_postgres = BigQueryToPostgresOperator(
+ task_id="bigquery_to_postgres",
+ dataset_table=f"{DATASET_NAME}.{TABLE}",
+ target_table_name=destination_table,
replace=False,
)
- # [END howto_operator_bigquery_to_mssql]
+ # [END howto_operator_bigquery_to_postgres]
create_dataset =
BigQueryCreateEmptyDatasetOperator(task_id="create_dataset",
dataset_id=DATASET_NAME)
@@ -83,7 +82,7 @@ with models.DAG(
create_dataset
>> create_table
# TEST BODY
- >> bigquery_to_mssql
+ >> bigquery_to_postgres
# TEST TEARDOWN
>> delete_dataset
)