This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 49921763eb Migrate system test for PostgresToGCSOperator to new design
AIP-47 (#32641)
49921763eb is described below
commit 49921763eb15f68f91da826a86690ba4c4155c35
Author: max <[email protected]>
AuthorDate: Fri Jul 21 12:20:41 2023 +0200
Migrate system test for PostgresToGCSOperator to new design AIP-47 (#32641)
Co-authored-by: Niko Oliveira <[email protected]>
---
.../cloud/example_dags/example_postgres_to_gcs.py | 51 -----
.../google/cloud/transfers/postgres_to_gcs.py | 4 +
airflow/providers/google/provider.yaml | 1 +
.../operators/transfer/postgres_to_gcs.rst | 52 +++++
.../cloud/transfers/test_postgres_to_gcs_system.py | 2 +-
.../cloud/transfers/example_postgres_to_gcs.py | 227 +++++++++++++++++++++
6 files changed, 285 insertions(+), 52 deletions(-)
diff --git
a/airflow/providers/google/cloud/example_dags/example_postgres_to_gcs.py
b/airflow/providers/google/cloud/example_dags/example_postgres_to_gcs.py
deleted file mode 100644
index eca37996ee..0000000000
--- a/airflow/providers/google/cloud/example_dags/example_postgres_to_gcs.py
+++ /dev/null
@@ -1,51 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""
-Example DAG using PostgresToGoogleCloudStorageOperator.
-"""
-from __future__ import annotations
-
-import os
-from datetime import datetime
-
-from airflow import models
-from airflow.providers.google.cloud.transfers.postgres_to_gcs import
PostgresToGCSOperator
-
-PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
-GCS_BUCKET = os.environ.get("GCP_GCS_BUCKET_NAME", "INVALID BUCKET NAME")
-FILENAME = "test_file"
-SQL_QUERY = "select * from test_table;"
-
-with models.DAG(
- dag_id="example_postgres_to_gcs",
- start_date=datetime(2021, 1, 1),
- catchup=False,
- tags=["example"],
-) as dag:
- upload_data = PostgresToGCSOperator(
- task_id="get_data", sql=SQL_QUERY, bucket=GCS_BUCKET,
filename=FILENAME, gzip=False
- )
-
- upload_data_server_side_cursor = PostgresToGCSOperator(
- task_id="get_data_with_server_side_cursor",
- sql=SQL_QUERY,
- bucket=GCS_BUCKET,
- filename=FILENAME,
- gzip=False,
- use_server_side_cursor=True,
- )
diff --git a/airflow/providers/google/cloud/transfers/postgres_to_gcs.py
b/airflow/providers/google/cloud/transfers/postgres_to_gcs.py
index f83b47ea32..9120f7c95a 100644
--- a/airflow/providers/google/cloud/transfers/postgres_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/postgres_to_gcs.py
@@ -69,6 +69,10 @@ class PostgresToGCSOperator(BaseSQLToGCSOperator):
"""
Copy data from Postgres to Google Cloud Storage in JSON, CSV or Parquet
format.
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:PostgresToGCSOperator`
+
:param postgres_conn_id: Reference to a specific Postgres hook.
:param use_server_side_cursor: If server-side cursor should be used for
querying postgres.
For detailed info, check
https://www.psycopg.org/docs/usage.html#server-side-cursors
diff --git a/airflow/providers/google/provider.yaml
b/airflow/providers/google/provider.yaml
index 3a8471de26..14dc8c7890 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -927,6 +927,7 @@ transfers:
python-module: airflow.providers.google.cloud.transfers.gcs_to_sftp
- source-integration-name: PostgreSQL
target-integration-name: Google Cloud Storage (GCS)
+ how-to-guide:
/docs/apache-airflow-providers-google/operators/transfer/postgres_to_gcs.rst
python-module: airflow.providers.google.cloud.transfers.postgres_to_gcs
- source-integration-name: Google BigQuery
target-integration-name: Common SQL
diff --git
a/docs/apache-airflow-providers-google/operators/transfer/postgres_to_gcs.rst
b/docs/apache-airflow-providers-google/operators/transfer/postgres_to_gcs.rst
new file mode 100644
index 0000000000..b6b01ce864
--- /dev/null
+++
b/docs/apache-airflow-providers-google/operators/transfer/postgres_to_gcs.rst
@@ -0,0 +1,52 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+Postgres To Google Cloud Storage Operator
+=========================================
+The `Google Cloud Storage <https://cloud.google.com/storage/>`__ (GCS) service
is
+used to store large data from various applications. This page shows how to copy
+data from Postgres to GCS.
+
+Prerequisite Tasks
+^^^^^^^^^^^^^^^^^^
+
+.. include:: /operators/_partials/prerequisite_tasks.rst
+
+.. _howto/operator:PostgresToGCSOperator:
+
+PostgresToGCSOperator
+~~~~~~~~~~~~~~~~~~~~~
+
+:class:`~airflow.providers.google.cloud.transfers.postgres_to_gcs.PostgresToGCSOperator`
allows you to upload
+data from Postgres database to GCS.
+
+When you use this operator, you can optionally compress the data being
uploaded to gzip format.
+
+Below is an example of using this operator to upload data to GCS.
+
+.. exampleinclude::
/../../tests/system/providers/google/cloud/transfers/example_postgres_to_gcs.py
+ :language: python
+ :dedent: 0
+ :start-after: [START howto_operator_postgres_to_gcs]
+ :end-before: [END howto_operator_postgres_to_gcs]
+
+
+Reference
+---------
+
+For further information, look at:
+* `Google Cloud Storage Documentation <https://cloud.google.com/storage/>`__
diff --git
a/tests/providers/google/cloud/transfers/test_postgres_to_gcs_system.py
b/tests/providers/google/cloud/transfers/test_postgres_to_gcs_system.py
index 8b88f3f2d9..262b41ca24 100644
--- a/tests/providers/google/cloud/transfers/test_postgres_to_gcs_system.py
+++ b/tests/providers/google/cloud/transfers/test_postgres_to_gcs_system.py
@@ -20,9 +20,9 @@ from __future__ import annotations
import pytest
from psycopg2 import ProgrammingError
-from airflow.providers.google.cloud.example_dags.example_postgres_to_gcs
import GCS_BUCKET
from airflow.providers.postgres.hooks.postgres import PostgresHook
from tests.providers.google.cloud.utils.gcp_authenticator import GCP_GCS_KEY
+from tests.system.providers.google.cloud.transfers.example_postgres_to_gcs
import BUCKET_NAME as GCS_BUCKET
from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER,
GoogleSystemTest, provide_gcp_context
CREATE_QUERY = """
diff --git
a/tests/system/providers/google/cloud/transfers/example_postgres_to_gcs.py
b/tests/system/providers/google/cloud/transfers/example_postgres_to_gcs.py
new file mode 100644
index 0000000000..64e1e1574d
--- /dev/null
+++ b/tests/system/providers/google/cloud/transfers/example_postgres_to_gcs.py
@@ -0,0 +1,227 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example DAG using PostgresToGoogleCloudStorageOperator.
+"""
+from __future__ import annotations
+
+import logging
+import os
+from datetime import datetime
+
+from googleapiclient import discovery
+
+from airflow import models
+from airflow.decorators import task
+from airflow.models import Connection
+from airflow.operators.bash import BashOperator
+from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
+from airflow.providers.google.cloud.operators.cloud_sql import (
+ CloudSQLCreateInstanceDatabaseOperator,
+ CloudSQLCreateInstanceOperator,
+ CloudSQLDeleteInstanceOperator,
+)
+from airflow.providers.google.cloud.operators.gcs import (
+ GCSCreateBucketOperator,
+ GCSDeleteBucketOperator,
+)
+from airflow.providers.google.cloud.transfers.postgres_to_gcs import
PostgresToGCSOperator
+from airflow.settings import Session
+from airflow.utils.trigger_rule import TriggerRule
+
+DAG_ID = "example_postgres_to_gcs"
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "example-project")
+
+CLOUD_SQL_INSTANCE = f"cloud-sql-{DAG_ID}-{ENV_ID}".replace("_", "-")
+CLOUD_SQL_INSTANCE_CREATION_BODY = {
+ "name": CLOUD_SQL_INSTANCE,
+ "settings": {
+ "tier": "db-custom-1-3840",
+ "dataDiskSizeGb": 30,
+ "ipConfiguration": {
+ "ipv4Enabled": True,
+ "requireSsl": False,
+ # Consider specifying your network mask
+ # for allowing requests only from the trusted sources, not from
anywhere
+ "authorizedNetworks": [
+ {"value": "0.0.0.0/0"},
+ ],
+ },
+ "pricingPlan": "PER_USE",
+ },
+ "databaseVersion": "POSTGRES_15",
+ "region": "us-central1",
+}
+DB_NAME = f"{DAG_ID}-{ENV_ID}-db".replace("-", "_")
+DB_PORT = 5432
+DB_CREATE_BODY = {"instance": CLOUD_SQL_INSTANCE, "name": DB_NAME, "project":
PROJECT_ID}
+DB_USER_NAME = "demo_user"
+DB_USER_PASSWORD = "demo_password"
+CONNECTION_ID = f"postgres_{DAG_ID}_{ENV_ID}".replace("-", "_")
+
+BUCKET_NAME = f"{DAG_ID}_{ENV_ID}_bucket"
+FILE_NAME = "result.json"
+
+SQL_TABLE = "test_table"
+SQL_CREATE = f"CREATE TABLE IF NOT EXISTS {SQL_TABLE} (col_1 INT, col_2
VARCHAR(8))"
+SQL_INSERT = f"INSERT INTO {SQL_TABLE} (col_1, col_2) VALUES (1, 'one'), (2,
'two')"
+SQL_SELECT = f"SELECT * FROM {SQL_TABLE}"
+
+log = logging.getLogger(__name__)
+
+
+with models.DAG(
+ dag_id=DAG_ID,
+ schedule="@once",
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+ tags=["example", "postgres", "gcs"],
+) as dag:
+ create_cloud_sql_instance = CloudSQLCreateInstanceOperator(
+ task_id="create_cloud_sql_instance",
+ project_id=PROJECT_ID,
+ instance=CLOUD_SQL_INSTANCE,
+ body=CLOUD_SQL_INSTANCE_CREATION_BODY,
+ )
+
+ create_database = CloudSQLCreateInstanceDatabaseOperator(
+ task_id="create_database", body=DB_CREATE_BODY,
instance=CLOUD_SQL_INSTANCE
+ )
+
+ @task
+ def create_user() -> None:
+ with discovery.build("sqladmin", "v1beta4") as service:
+ request = service.users().insert(
+ project=PROJECT_ID,
+ instance=CLOUD_SQL_INSTANCE,
+ body={
+ "name": DB_USER_NAME,
+ "password": DB_USER_PASSWORD,
+ },
+ )
+ request.execute()
+
+ create_user_task = create_user()
+
+ @task
+ def get_public_ip() -> str | None:
+ with discovery.build("sqladmin", "v1beta4") as service:
+ request = service.connect().get(
+ project=PROJECT_ID, instance=CLOUD_SQL_INSTANCE,
fields="ipAddresses"
+ )
+ response = request.execute()
+ for ip_item in response.get("ipAddresses", []):
+ if ip_item["type"] == "PRIMARY":
+ return ip_item["ipAddress"]
+
+ get_public_ip_task = get_public_ip()
+
+ @task
+ def setup_postgres_connection(**kwargs) -> None:
+ public_ip = kwargs["ti"].xcom_pull(task_ids="get_public_ip")
+ connection = Connection(
+ conn_id=CONNECTION_ID,
+ description="Example PostgreSQL connection",
+ conn_type="postgres",
+ host=public_ip,
+ login=DB_USER_NAME,
+ password=DB_USER_PASSWORD,
+ schema=DB_NAME,
+ port=DB_PORT,
+ )
+ session: Session = Session()
+ if session.query(Connection).filter(Connection.conn_id ==
CONNECTION_ID).first():
+ log.warning("Connection %s already exists", CONNECTION_ID)
+ return None
+
+ session.add(connection)
+ session.commit()
+
+ setup_postgres_connection_task = setup_postgres_connection()
+
+ create_bucket = GCSCreateBucketOperator(
+ task_id="create_bucket",
+ bucket_name=BUCKET_NAME,
+ )
+
+ create_sql_table = SQLExecuteQueryOperator(
+ task_id="create_sql_table",
+ conn_id=CONNECTION_ID,
+ sql=SQL_CREATE,
+ )
+
+ insert_data = SQLExecuteQueryOperator(
+ task_id="insert_data",
+ conn_id=CONNECTION_ID,
+ sql=SQL_INSERT,
+ )
+
+ # [START howto_operator_postgres_to_gcs]
+ get_data = PostgresToGCSOperator(
+ task_id="get_data",
+ postgres_conn_id=CONNECTION_ID,
+ sql=SQL_SELECT,
+ bucket=BUCKET_NAME,
+ filename=FILE_NAME,
+ gzip=False,
+ )
+ # [END howto_operator_postgres_to_gcs]
+
+ delete_cloud_sql_instance = CloudSQLDeleteInstanceOperator(
+ task_id="delete_cloud_sql_instance",
+ project_id=PROJECT_ID,
+ instance=CLOUD_SQL_INSTANCE,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ delete_postgres_connection = BashOperator(
+ task_id="delete_postgres_connection",
+ bash_command=f"airflow connections delete {CONNECTION_ID}",
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ delete_bucket = GCSDeleteBucketOperator(
+ task_id="delete_bucket",
+ bucket_name=BUCKET_NAME,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ # TEST SETUP
+ create_cloud_sql_instance >> [create_database, create_user_task,
get_public_ip_task]
+ [create_user_task, get_public_ip_task] >> setup_postgres_connection_task
+ create_database >> setup_postgres_connection_task >> create_sql_table >>
insert_data
+ (
+ [insert_data, create_bucket]
+ # TEST BODY
+ >> get_data
+ # TEST TEARDOWN
+ >> [delete_cloud_sql_instance, delete_postgres_connection,
delete_bucket]
+ )
+
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)