This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 49921763eb Migrate system test for PostgresToGCSOperator to new design 
AIP-47 (#32641)
49921763eb is described below

commit 49921763eb15f68f91da826a86690ba4c4155c35
Author: max <[email protected]>
AuthorDate: Fri Jul 21 12:20:41 2023 +0200

    Migrate system test for PostgresToGCSOperator to new design AIP-47 (#32641)
    
    Co-authored-by: Niko Oliveira <[email protected]>
---
 .../cloud/example_dags/example_postgres_to_gcs.py  |  51 -----
 .../google/cloud/transfers/postgres_to_gcs.py      |   4 +
 airflow/providers/google/provider.yaml             |   1 +
 .../operators/transfer/postgres_to_gcs.rst         |  52 +++++
 .../cloud/transfers/test_postgres_to_gcs_system.py |   2 +-
 .../cloud/transfers/example_postgres_to_gcs.py     | 227 +++++++++++++++++++++
 6 files changed, 285 insertions(+), 52 deletions(-)

diff --git 
a/airflow/providers/google/cloud/example_dags/example_postgres_to_gcs.py 
b/airflow/providers/google/cloud/example_dags/example_postgres_to_gcs.py
deleted file mode 100644
index eca37996ee..0000000000
--- a/airflow/providers/google/cloud/example_dags/example_postgres_to_gcs.py
+++ /dev/null
@@ -1,51 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""
-Example DAG using PostgresToGoogleCloudStorageOperator.
-"""
-from __future__ import annotations
-
-import os
-from datetime import datetime
-
-from airflow import models
-from airflow.providers.google.cloud.transfers.postgres_to_gcs import 
PostgresToGCSOperator
-
-PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
-GCS_BUCKET = os.environ.get("GCP_GCS_BUCKET_NAME", "INVALID BUCKET NAME")
-FILENAME = "test_file"
-SQL_QUERY = "select * from test_table;"
-
-with models.DAG(
-    dag_id="example_postgres_to_gcs",
-    start_date=datetime(2021, 1, 1),
-    catchup=False,
-    tags=["example"],
-) as dag:
-    upload_data = PostgresToGCSOperator(
-        task_id="get_data", sql=SQL_QUERY, bucket=GCS_BUCKET, 
filename=FILENAME, gzip=False
-    )
-
-    upload_data_server_side_cursor = PostgresToGCSOperator(
-        task_id="get_data_with_server_side_cursor",
-        sql=SQL_QUERY,
-        bucket=GCS_BUCKET,
-        filename=FILENAME,
-        gzip=False,
-        use_server_side_cursor=True,
-    )
diff --git a/airflow/providers/google/cloud/transfers/postgres_to_gcs.py 
b/airflow/providers/google/cloud/transfers/postgres_to_gcs.py
index f83b47ea32..9120f7c95a 100644
--- a/airflow/providers/google/cloud/transfers/postgres_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/postgres_to_gcs.py
@@ -69,6 +69,10 @@ class PostgresToGCSOperator(BaseSQLToGCSOperator):
     """
     Copy data from Postgres to Google Cloud Storage in JSON, CSV or Parquet 
format.
 
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:PostgresToGCSOperator`
+
     :param postgres_conn_id: Reference to a specific Postgres hook.
     :param use_server_side_cursor: If server-side cursor should be used for 
querying postgres.
         For detailed info, check 
https://www.psycopg.org/docs/usage.html#server-side-cursors
diff --git a/airflow/providers/google/provider.yaml 
b/airflow/providers/google/provider.yaml
index 3a8471de26..14dc8c7890 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -927,6 +927,7 @@ transfers:
     python-module: airflow.providers.google.cloud.transfers.gcs_to_sftp
   - source-integration-name: PostgreSQL
     target-integration-name: Google Cloud Storage (GCS)
+    how-to-guide: 
/docs/apache-airflow-providers-google/operators/transfer/postgres_to_gcs.rst
     python-module: airflow.providers.google.cloud.transfers.postgres_to_gcs
   - source-integration-name: Google BigQuery
     target-integration-name: Common SQL
diff --git 
a/docs/apache-airflow-providers-google/operators/transfer/postgres_to_gcs.rst 
b/docs/apache-airflow-providers-google/operators/transfer/postgres_to_gcs.rst
new file mode 100644
index 0000000000..b6b01ce864
--- /dev/null
+++ 
b/docs/apache-airflow-providers-google/operators/transfer/postgres_to_gcs.rst
@@ -0,0 +1,52 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+Postgres To Google Cloud Storage Operator
+=========================================
+The `Google Cloud Storage <https://cloud.google.com/storage/>`__ (GCS) service 
is
+used to store large data from various applications. This page shows how to copy
+data from Postgres to GCS.
+
+Prerequisite Tasks
+^^^^^^^^^^^^^^^^^^
+
+.. include:: /operators/_partials/prerequisite_tasks.rst
+
+.. _howto/operator:PostgresToGCSOperator:
+
+PostgresToGCSOperator
+~~~~~~~~~~~~~~~~~~~~~
+
+:class:`~airflow.providers.google.cloud.transfers.postgres_to_gcs.PostgresToGCSOperator`
 allows you to upload
+data from Postgres database to GCS.
+
+When you use this operator, you can optionally compress the data being 
uploaded to gzip format.
+
+Below is an example of using this operator to upload data to GCS.
+
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/transfers/example_postgres_to_gcs.py
+    :language: python
+    :dedent: 0
+    :start-after: [START howto_operator_postgres_to_gcs]
+    :end-before: [END howto_operator_postgres_to_gcs]
+
+
+Reference
+---------
+
+For further information, look at:
+* `Google Cloud Storage Documentation <https://cloud.google.com/storage/>`__
diff --git 
a/tests/providers/google/cloud/transfers/test_postgres_to_gcs_system.py 
b/tests/providers/google/cloud/transfers/test_postgres_to_gcs_system.py
index 8b88f3f2d9..262b41ca24 100644
--- a/tests/providers/google/cloud/transfers/test_postgres_to_gcs_system.py
+++ b/tests/providers/google/cloud/transfers/test_postgres_to_gcs_system.py
@@ -20,9 +20,9 @@ from __future__ import annotations
 import pytest
 from psycopg2 import ProgrammingError
 
-from airflow.providers.google.cloud.example_dags.example_postgres_to_gcs 
import GCS_BUCKET
 from airflow.providers.postgres.hooks.postgres import PostgresHook
 from tests.providers.google.cloud.utils.gcp_authenticator import GCP_GCS_KEY
+from tests.system.providers.google.cloud.transfers.example_postgres_to_gcs 
import BUCKET_NAME as GCS_BUCKET
 from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, 
GoogleSystemTest, provide_gcp_context
 
 CREATE_QUERY = """
diff --git 
a/tests/system/providers/google/cloud/transfers/example_postgres_to_gcs.py 
b/tests/system/providers/google/cloud/transfers/example_postgres_to_gcs.py
new file mode 100644
index 0000000000..64e1e1574d
--- /dev/null
+++ b/tests/system/providers/google/cloud/transfers/example_postgres_to_gcs.py
@@ -0,0 +1,227 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example DAG using PostgresToGoogleCloudStorageOperator.
+"""
+from __future__ import annotations
+
+import logging
+import os
+from datetime import datetime
+
+from googleapiclient import discovery
+
+from airflow import models
+from airflow.decorators import task
+from airflow.models import Connection
+from airflow.operators.bash import BashOperator
+from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
+from airflow.providers.google.cloud.operators.cloud_sql import (
+    CloudSQLCreateInstanceDatabaseOperator,
+    CloudSQLCreateInstanceOperator,
+    CloudSQLDeleteInstanceOperator,
+)
+from airflow.providers.google.cloud.operators.gcs import (
+    GCSCreateBucketOperator,
+    GCSDeleteBucketOperator,
+)
+from airflow.providers.google.cloud.transfers.postgres_to_gcs import 
PostgresToGCSOperator
+from airflow.settings import Session
+from airflow.utils.trigger_rule import TriggerRule
+
+DAG_ID = "example_postgres_to_gcs"
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "example-project")
+
+CLOUD_SQL_INSTANCE = f"cloud-sql-{DAG_ID}-{ENV_ID}".replace("_", "-")
+CLOUD_SQL_INSTANCE_CREATION_BODY = {
+    "name": CLOUD_SQL_INSTANCE,
+    "settings": {
+        "tier": "db-custom-1-3840",
+        "dataDiskSizeGb": 30,
+        "ipConfiguration": {
+            "ipv4Enabled": True,
+            "requireSsl": False,
+            # Consider specifying your network mask
+            # for allowing requests only from the trusted sources, not from 
anywhere
+            "authorizedNetworks": [
+                {"value": "0.0.0.0/0"},
+            ],
+        },
+        "pricingPlan": "PER_USE",
+    },
+    "databaseVersion": "POSTGRES_15",
+    "region": "us-central1",
+}
+DB_NAME = f"{DAG_ID}-{ENV_ID}-db".replace("-", "_")
+DB_PORT = 5432
+DB_CREATE_BODY = {"instance": CLOUD_SQL_INSTANCE, "name": DB_NAME, "project": 
PROJECT_ID}
+DB_USER_NAME = "demo_user"
+DB_USER_PASSWORD = "demo_password"
+CONNECTION_ID = f"postgres_{DAG_ID}_{ENV_ID}".replace("-", "_")
+
+BUCKET_NAME = f"{DAG_ID}_{ENV_ID}_bucket"
+FILE_NAME = "result.json"
+
+SQL_TABLE = "test_table"
+SQL_CREATE = f"CREATE TABLE IF NOT EXISTS {SQL_TABLE} (col_1 INT, col_2 
VARCHAR(8))"
+SQL_INSERT = f"INSERT INTO {SQL_TABLE} (col_1, col_2) VALUES (1, 'one'), (2, 
'two')"
+SQL_SELECT = f"SELECT * FROM {SQL_TABLE}"
+
+log = logging.getLogger(__name__)
+
+
+with models.DAG(
+    dag_id=DAG_ID,
+    schedule="@once",
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=["example", "postgres", "gcs"],
+) as dag:
+    create_cloud_sql_instance = CloudSQLCreateInstanceOperator(
+        task_id="create_cloud_sql_instance",
+        project_id=PROJECT_ID,
+        instance=CLOUD_SQL_INSTANCE,
+        body=CLOUD_SQL_INSTANCE_CREATION_BODY,
+    )
+
+    create_database = CloudSQLCreateInstanceDatabaseOperator(
+        task_id="create_database", body=DB_CREATE_BODY, 
instance=CLOUD_SQL_INSTANCE
+    )
+
+    @task
+    def create_user() -> None:
+        with discovery.build("sqladmin", "v1beta4") as service:
+            request = service.users().insert(
+                project=PROJECT_ID,
+                instance=CLOUD_SQL_INSTANCE,
+                body={
+                    "name": DB_USER_NAME,
+                    "password": DB_USER_PASSWORD,
+                },
+            )
+            request.execute()
+
+    create_user_task = create_user()
+
+    @task
+    def get_public_ip() -> str | None:
+        with discovery.build("sqladmin", "v1beta4") as service:
+            request = service.connect().get(
+                project=PROJECT_ID, instance=CLOUD_SQL_INSTANCE, 
fields="ipAddresses"
+            )
+            response = request.execute()
+            for ip_item in response.get("ipAddresses", []):
+                if ip_item["type"] == "PRIMARY":
+                    return ip_item["ipAddress"]
+
+    get_public_ip_task = get_public_ip()
+
+    @task
+    def setup_postgres_connection(**kwargs) -> None:
+        public_ip = kwargs["ti"].xcom_pull(task_ids="get_public_ip")
+        connection = Connection(
+            conn_id=CONNECTION_ID,
+            description="Example PostgreSQL connection",
+            conn_type="postgres",
+            host=public_ip,
+            login=DB_USER_NAME,
+            password=DB_USER_PASSWORD,
+            schema=DB_NAME,
+            port=DB_PORT,
+        )
+        session: Session = Session()
+        if session.query(Connection).filter(Connection.conn_id == 
CONNECTION_ID).first():
+            log.warning("Connection %s already exists", CONNECTION_ID)
+            return None
+
+        session.add(connection)
+        session.commit()
+
+    setup_postgres_connection_task = setup_postgres_connection()
+
+    create_bucket = GCSCreateBucketOperator(
+        task_id="create_bucket",
+        bucket_name=BUCKET_NAME,
+    )
+
+    create_sql_table = SQLExecuteQueryOperator(
+        task_id="create_sql_table",
+        conn_id=CONNECTION_ID,
+        sql=SQL_CREATE,
+    )
+
+    insert_data = SQLExecuteQueryOperator(
+        task_id="insert_data",
+        conn_id=CONNECTION_ID,
+        sql=SQL_INSERT,
+    )
+
+    # [START howto_operator_postgres_to_gcs]
+    get_data = PostgresToGCSOperator(
+        task_id="get_data",
+        postgres_conn_id=CONNECTION_ID,
+        sql=SQL_SELECT,
+        bucket=BUCKET_NAME,
+        filename=FILE_NAME,
+        gzip=False,
+    )
+    # [END howto_operator_postgres_to_gcs]
+
+    delete_cloud_sql_instance = CloudSQLDeleteInstanceOperator(
+        task_id="delete_cloud_sql_instance",
+        project_id=PROJECT_ID,
+        instance=CLOUD_SQL_INSTANCE,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    delete_postgres_connection = BashOperator(
+        task_id="delete_postgres_connection",
+        bash_command=f"airflow connections delete {CONNECTION_ID}",
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    delete_bucket = GCSDeleteBucketOperator(
+        task_id="delete_bucket",
+        bucket_name=BUCKET_NAME,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    # TEST SETUP
+    create_cloud_sql_instance >> [create_database, create_user_task, 
get_public_ip_task]
+    [create_user_task, get_public_ip_task] >> setup_postgres_connection_task
+    create_database >> setup_postgres_connection_task >> create_sql_table >> 
insert_data
+    (
+        [insert_data, create_bucket]
+        # TEST BODY
+        >> get_data
+        # TEST TEARDOWN
+        >> [delete_cloud_sql_instance, delete_postgres_connection, 
delete_bucket]
+    )
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)

Reply via email to