This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e3e6776576 Fix system tests (#39193)
e3e6776576 is described below
commit e3e67765765b8c0404aa926629e826d9600d437e
Author: max <[email protected]>
AuthorDate: Tue Apr 23 09:59:18 2024 +0000
Fix system tests (#39193)
---
.../google/cloud/gcs/example_mysql_to_gcs.py | 194 ++++++++-------
.../cloud/sql_to_sheets/example_sql_to_sheets.py | 277 ++++++++++-----------
.../cloud/transfers/example_postgres_to_gcs.py | 200 +++++++--------
3 files changed, 327 insertions(+), 344 deletions(-)
diff --git a/tests/system/providers/google/cloud/gcs/example_mysql_to_gcs.py
b/tests/system/providers/google/cloud/gcs/example_mysql_to_gcs.py
index 5b96b49c1d..884149d984 100644
--- a/tests/system/providers/google/cloud/gcs/example_mysql_to_gcs.py
+++ b/tests/system/providers/google/cloud/gcs/example_mysql_to_gcs.py
@@ -62,21 +62,38 @@ PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT",
"example-project")
REGION = "europe-west2"
ZONE = REGION + "-a"
NETWORK = "default"
+CONNECTION_ID = f"mysql_{DAG_ID}_{ENV_ID}".replace("-", "_")
+CONNECTION_TYPE = "mysql"
+
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+FILE_NAME = "result.json"
DB_NAME = "testdb"
DB_PORT = 3306
DB_USER_NAME = "root"
DB_USER_PASSWORD = "demo_password"
+SETUP_MYSQL_COMMAND = f"""
+sudo apt update &&
+sudo apt install -y docker.io &&
+sudo docker run -d -p {DB_PORT}:{DB_PORT} --name {DB_NAME} \
+ -e MYSQL_ROOT_PASSWORD={DB_USER_PASSWORD} \
+ -e MYSQL_DATABASE={DB_NAME} \
+ mysql:8.1.0
+"""
+SQL_TABLE = "test_table"
+SQL_CREATE = f"CREATE TABLE IF NOT EXISTS {DB_NAME}.{SQL_TABLE} (col_1 INT,
col_2 VARCHAR(8))"
+SQL_INSERT = f"INSERT INTO {DB_NAME}.{SQL_TABLE} (col_1, col_2) VALUES (1,
'one'), (2, 'two')"
+SQL_SELECT = f"SELECT * FROM {DB_NAME}.{SQL_TABLE}"
-SHORT_MACHINE_TYPE_NAME = "n1-standard-1"
-DB_INSTANCE_NAME = f"instance-{DAG_ID}-{ENV_ID}".replace("_", "-")
+GCE_MACHINE_TYPE = "n1-standard-1"
+GCE_INSTANCE_NAME = f"instance-{DAG_ID}-{ENV_ID}".replace("_", "-")
GCE_INSTANCE_BODY = {
- "name": DB_INSTANCE_NAME,
- "machine_type": f"zones/{ZONE}/machineTypes/{SHORT_MACHINE_TYPE_NAME}",
+ "name": GCE_INSTANCE_NAME,
+ "machine_type": f"zones/{ZONE}/machineTypes/{GCE_MACHINE_TYPE}",
"disks": [
{
"boot": True,
- "device_name": DB_INSTANCE_NAME,
+ "device_name": GCE_INSTANCE_NAME,
"initialize_params": {
"disk_size_gb": "10",
"disk_type": f"zones/{ZONE}/diskTypes/pd-balanced",
@@ -92,56 +109,41 @@ GCE_INSTANCE_BODY = {
}
],
}
-DELETE_PERSISTENT_DISK = f"""
+FIREWALL_RULE_NAME = f"allow-http-{DB_PORT}"
+CREATE_FIREWALL_RULE_COMMAND = f"""
if [ $AIRFLOW__API__GOOGLE_KEY_PATH ]; then \
gcloud auth activate-service-account
--key-file=$AIRFLOW__API__GOOGLE_KEY_PATH; \
fi;
-gcloud compute disks delete {DB_INSTANCE_NAME} --project={PROJECT_ID}
--zone={ZONE} --quiet
+if [ -z gcloud compute firewall-rules list --filter=name:{FIREWALL_RULE_NAME}
--format="value(name)" ]; then \
+ gcloud compute firewall-rules create {FIREWALL_RULE_NAME} \
+ --project={PROJECT_ID} \
+ --direction=INGRESS \
+ --priority=100 \
+ --network={NETWORK} \
+ --action=ALLOW \
+ --rules=tcp:{DB_PORT} \
+ --source-ranges=0.0.0.0/0
+else
+ echo "Firewall rule {FIREWALL_RULE_NAME} already exists."
+fi
"""
-
-SETUP_MYSQL = f"""
-sudo apt update &&
-sudo apt install -y docker.io &&
-sudo docker run -d -p {DB_PORT}:{DB_PORT} --name {DB_NAME} \
- -e MYSQL_ROOT_PASSWORD={DB_USER_PASSWORD} \
- -e MYSQL_DATABASE={DB_NAME} \
- mysql:8.1.0
-"""
-
-FIREWALL_RULE_NAME = f"allow-http-{DB_PORT}"
-CREATE_FIREWALL_RULE = f"""
+DELETE_FIREWALL_RULE_COMMAND = f"""
if [ $AIRFLOW__API__GOOGLE_KEY_PATH ]; then \
gcloud auth activate-service-account
--key-file=$AIRFLOW__API__GOOGLE_KEY_PATH; \
+fi; \
+if [ gcloud compute firewall-rules list --filter=name:{FIREWALL_RULE_NAME}
--format="value(name)" ]; then \
+ gcloud compute firewall-rules delete {FIREWALL_RULE_NAME}
--project={PROJECT_ID} --quiet; \
fi;
-
-gcloud compute firewall-rules create {FIREWALL_RULE_NAME} \
- --project={PROJECT_ID} \
- --direction=INGRESS \
- --priority=100 \
- --network={NETWORK} \
- --action=ALLOW \
- --rules=tcp:{DB_PORT} \
- --source-ranges=0.0.0.0/0
"""
-DELETE_FIREWALL_RULE = f"""
+DELETE_PERSISTENT_DISK_COMMAND = f"""
if [ $AIRFLOW__API__GOOGLE_KEY_PATH ]; then \
gcloud auth activate-service-account
--key-file=$AIRFLOW__API__GOOGLE_KEY_PATH; \
fi;
-gcloud compute firewall-rules delete {FIREWALL_RULE_NAME}
--project={PROJECT_ID} --quiet
+gcloud compute disks delete {GCE_INSTANCE_NAME} --project={PROJECT_ID}
--zone={ZONE} --quiet
"""
-CONNECTION_ID = f"mysql_{DAG_ID}_{ENV_ID}".replace("-", "_")
-
-BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
-FILE_NAME = "result.json"
-
-SQL_TABLE = "test_table"
-SQL_CREATE = f"CREATE TABLE IF NOT EXISTS {SQL_TABLE} (col_1 INT, col_2
VARCHAR(8))"
-SQL_INSERT = f"INSERT INTO {SQL_TABLE} (col_1, col_2) VALUES (1, 'one'), (2,
'two')"
-SQL_SELECT = f"SELECT * FROM {SQL_TABLE}"
-
log = logging.getLogger(__name__)
@@ -153,8 +155,8 @@ with DAG(
catchup=False,
tags=["example", "mysql", "gcs"],
) as dag:
- create_instance = ComputeEngineInsertInstanceOperator(
- task_id="create_instance",
+ create_gce_instance = ComputeEngineInsertInstanceOperator(
+ task_id="create_gce_instance",
project_id=PROJECT_ID,
zone=ZONE,
body=GCE_INSTANCE_BODY,
@@ -162,127 +164,129 @@ with DAG(
create_firewall_rule = BashOperator(
task_id="create_firewall_rule",
- bash_command=CREATE_FIREWALL_RULE,
+ bash_command=CREATE_FIREWALL_RULE_COMMAND,
)
setup_mysql = SSHOperator(
task_id="setup_mysql",
ssh_hook=ComputeEngineSSHHook(
user="username",
- instance_name=DB_INSTANCE_NAME,
+ instance_name=GCE_INSTANCE_NAME,
zone=ZONE,
project_id=PROJECT_ID,
use_oslogin=False,
use_iap_tunnel=False,
cmd_timeout=180,
),
- command=SETUP_MYSQL,
+ command=SETUP_MYSQL_COMMAND,
retries=2,
)
@task
def get_public_ip() -> str:
hook = ComputeEngineHook()
- address = hook.get_instance_address(resource_id=DB_INSTANCE_NAME,
zone=ZONE, project_id=PROJECT_ID)
+ address = hook.get_instance_address(resource_id=GCE_INSTANCE_NAME,
zone=ZONE, project_id=PROJECT_ID)
return address
get_public_ip_task = get_public_ip()
@task
- def setup_mysql_connection(**kwargs) -> None:
- public_ip = kwargs["ti"].xcom_pull(task_ids="get_public_ip")
+ def setup_connection(ip_address: str) -> None:
connection = Connection(
conn_id=CONNECTION_ID,
- description="Example MySQL connection",
- conn_type="mysql",
- host=public_ip,
+ description="Example connection",
+ conn_type=CONNECTION_TYPE,
+ host=ip_address,
login=DB_USER_NAME,
password=DB_USER_PASSWORD,
- schema=DB_NAME,
+ port=DB_PORT,
)
session = Session()
- if session.query(Connection).filter(Connection.conn_id ==
CONNECTION_ID).first():
- log.warning("Connection %s already exists", CONNECTION_ID)
- return None
+ log.info("Removing connection %s if it exists", CONNECTION_ID)
+ query = session.query(Connection).filter(Connection.conn_id ==
CONNECTION_ID)
+ query.delete()
session.add(connection)
session.commit()
+ log.info("Connection %s created", CONNECTION_ID)
- setup_mysql_connection_task = setup_mysql_connection()
-
- create_bucket = GCSCreateBucketOperator(
- task_id="create_bucket",
- bucket_name=BUCKET_NAME,
- )
+ setup_connection_task = setup_connection(get_public_ip_task)
create_sql_table = SQLExecuteQueryOperator(
task_id="create_sql_table",
conn_id=CONNECTION_ID,
sql=SQL_CREATE,
+ retries=4,
)
- insert_data = SQLExecuteQueryOperator(
- task_id="insert_data",
+ insert_sql_data = SQLExecuteQueryOperator(
+ task_id="insert_sql_data",
conn_id=CONNECTION_ID,
sql=SQL_INSERT,
)
+ create_gcs_bucket = GCSCreateBucketOperator(
+ task_id="create_gcs_bucket",
+ bucket_name=BUCKET_NAME,
+ )
+
# [START howto_operator_mysql_to_gcs]
- upload_mysql_to_gcs = MySQLToGCSOperator(
- task_id="mysql_to_gcs", sql=SQL_SELECT, bucket=BUCKET_NAME,
filename=FILE_NAME, export_format="csv"
+ mysql_to_gcs = MySQLToGCSOperator(
+ task_id="mysql_to_gcs",
+ mysql_conn_id=CONNECTION_ID,
+ sql=SQL_SELECT,
+ bucket=BUCKET_NAME,
+ filename=FILE_NAME,
+ export_format="csv",
)
# [END howto_operator_mysql_to_gcs]
- delete_mysql_connection = BashOperator(
- task_id="delete_mysql_connection",
- bash_command=f"airflow connections delete {CONNECTION_ID}",
+ delete_gcs_bucket = GCSDeleteBucketOperator(
+ task_id="delete_gcs_bucket",
+ bucket_name=BUCKET_NAME,
trigger_rule=TriggerRule.ALL_DONE,
)
- delete_bucket = GCSDeleteBucketOperator(
- task_id="delete_bucket",
- bucket_name=BUCKET_NAME,
+ delete_firewall_rule = BashOperator(
+ task_id="delete_firewall_rule",
+ bash_command=DELETE_FIREWALL_RULE_COMMAND,
trigger_rule=TriggerRule.ALL_DONE,
)
- delete_instance = ComputeEngineDeleteInstanceOperator(
- task_id="delete_instance",
- resource_id=DB_INSTANCE_NAME,
+ delete_gce_instance = ComputeEngineDeleteInstanceOperator(
+ task_id="delete_gce_instance",
+ resource_id=GCE_INSTANCE_NAME,
zone=ZONE,
project_id=PROJECT_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
- delete_firewall_rule = BashOperator(
- task_id="delete_firewall_rule",
- bash_command=DELETE_FIREWALL_RULE,
- trigger_rule=TriggerRule.ALL_DONE,
- )
-
delete_persistent_disk = BashOperator(
task_id="delete_persistent_disk",
- bash_command=DELETE_PERSISTENT_DISK,
+ bash_command=DELETE_PERSISTENT_DISK_COMMAND,
trigger_rule=TriggerRule.ALL_DONE,
)
- (
- # TEST SETUP
- create_instance
- >> setup_mysql
- >> get_public_ip_task
- >> setup_mysql_connection_task
- >> create_firewall_rule
- >> create_sql_table
- >> insert_data
+ delete_connection = BashOperator(
+ task_id="delete_connection",
+ bash_command=f"airflow connections delete {CONNECTION_ID}",
+ trigger_rule=TriggerRule.ALL_DONE,
)
+
+ # TEST SETUP
+ create_gce_instance >> setup_mysql
+ create_gce_instance >> get_public_ip_task >> setup_connection_task
+ [setup_mysql, setup_connection_task, create_firewall_rule] >>
create_sql_table >> insert_sql_data
+
(
- [insert_data, create_bucket]
+ [create_gcs_bucket, insert_sql_data]
# TEST BODY
- >> upload_mysql_to_gcs
- # TEST TEARDOWN
- >> [delete_instance, delete_bucket, delete_mysql_connection,
delete_firewall_rule]
+ >> mysql_to_gcs
)
- delete_instance >> delete_persistent_disk
+
+ # TEST TEARDOWN
+ mysql_to_gcs >> [delete_gcs_bucket, delete_firewall_rule,
delete_gce_instance, delete_connection]
+ delete_gce_instance >> delete_persistent_disk
from tests.system.utils.watcher import watcher
diff --git
a/tests/system/providers/google/cloud/sql_to_sheets/example_sql_to_sheets.py
b/tests/system/providers/google/cloud/sql_to_sheets/example_sql_to_sheets.py
index 314f388c52..811526a82a 100644
--- a/tests/system/providers/google/cloud/sql_to_sheets/example_sql_to_sheets.py
+++ b/tests/system/providers/google/cloud/sql_to_sheets/example_sql_to_sheets.py
@@ -1,4 +1,3 @@
-#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -17,51 +16,17 @@
# under the License.
"""
-First, you need a db instance that is accessible from the Airflow environment.
-You can, for example, create a Cloud SQL instance and connect to it from
-within breeze with Cloud SQL proxy:
-https://cloud.google.com/sql/docs/postgres/connect-instance-auth-proxy
-
-# DB setup
-Create db:
-```
-CREATE DATABASE test_db;
-```
-
-Switch to db:
-```
-\c test_db
-```
-
-Create table and insert some rows
-```
-CREATE TABLE test_table (col1 INT, col2 INT);
-INSERT INTO test_table VALUES (1,2), (3,4), (5,6), (7,8);
-```
-
-# Setup connections
-db connection:
-In airflow UI, set one db connection, for example "postgres_default"
-and make sure the "Test" at the bottom succeeds
-
-google cloud connection:
-We need additional scopes for this test
-scopes: https://www.googleapis.com/auth/spreadsheets,
https://www.googleapis.com/auth/cloud-platform
-
-# Sheet
-Finally, you need a Google Sheet you have access to, for testing you can
-create a public sheet and get its ID.
-
-# Tear Down
-You can delete the db with
-```
-DROP DATABASE test_db;
-```
+Example DAG using SQLToGoogleSheetsOperator.
+
+This DAG relies on the following OS environment variables
+
+* AIRFLOW__API__GOOGLE_KEY_PATH - Path to service account key file. Note, you
can skip this variable if you
+ run this DAG in a Composer environment.
+
"""
from __future__ import annotations
-import json
import logging
import os
from datetime import datetime
@@ -80,38 +45,50 @@ from airflow.providers.google.cloud.operators.compute
import (
from airflow.providers.google.suite.operators.sheets import
GoogleSheetsCreateSpreadsheetOperator
from airflow.providers.google.suite.transfers.sql_to_sheets import
SQLToGoogleSheetsOperator
from airflow.providers.ssh.operators.ssh import SSHOperator
-from airflow.settings import Session
+from airflow.settings import Session, json
from airflow.utils.trigger_rule import TriggerRule
DAG_ID = "example_sql_to_sheets"
-ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
-PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "example-project")
REGION = "europe-west2"
ZONE = REGION + "-a"
NETWORK = "default"
+CONNECTION_ID = f"pg_{DAG_ID}_{ENV_ID}".replace("-", "_")
+CONNECTION_TYPE = "postgres"
-SHEETS_CONNECTION_ID = f"connection_{DAG_ID}_{ENV_ID}"
-SPREADSHEET = {
- "properties": {"title": "Test1"},
- "sheets": [{"properties": {"title": "Sheet1"}}],
-}
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+FILE_NAME = "result.json"
-DB_NAME = f"{DAG_ID}-{ENV_ID}-db".replace("-", "_")
+DB_NAME = "testdb"
DB_PORT = 5432
-DB_USER_NAME = "demo_user"
+DB_USER_NAME = "root"
DB_USER_PASSWORD = "demo_password"
-DB_CONNECTION_ID = f"postgres_{DAG_ID}_{ENV_ID}".replace("-", "_")
+SETUP_POSTGRES_COMMAND = f"""
+sudo apt update &&
+sudo apt install -y docker.io &&
+sudo docker run -d -p {DB_PORT}:{DB_PORT} --name {DB_NAME} \
+ -e PGUSER={DB_USER_NAME} \
+ -e POSTGRES_USER={DB_USER_NAME} \
+ -e POSTGRES_PASSWORD={DB_USER_PASSWORD} \
+ -e POSTGRES_DB={DB_NAME} \
+ postgres
+"""
+SQL_TABLE = "test_table"
+SQL_CREATE = f"CREATE TABLE IF NOT EXISTS {SQL_TABLE} (col_1 INT, col_2
VARCHAR(8))"
+SQL_INSERT = f"INSERT INTO {SQL_TABLE} (col_1, col_2) VALUES (1, 'one'), (2,
'two')"
+SQL_SELECT = f"SELECT * FROM {SQL_TABLE}"
-SHORT_MACHINE_TYPE_NAME = "n1-standard-1"
-DB_INSTANCE_NAME = f"instance-{DAG_ID}-{ENV_ID}".replace("_", "-")
+GCE_MACHINE_TYPE = "n1-standard-1"
+GCE_INSTANCE_NAME = f"instance-{DAG_ID}-{ENV_ID}".replace("_", "-")
GCE_INSTANCE_BODY = {
- "name": DB_INSTANCE_NAME,
- "machine_type": f"zones/{ZONE}/machineTypes/{SHORT_MACHINE_TYPE_NAME}",
+ "name": GCE_INSTANCE_NAME,
+ "machine_type": f"zones/{ZONE}/machineTypes/{GCE_MACHINE_TYPE}",
"disks": [
{
"boot": True,
- "device_name": DB_INSTANCE_NAME,
+ "device_name": GCE_INSTANCE_NAME,
"initialize_params": {
"disk_size_gb": "10",
"disk_type": f"zones/{ZONE}/diskTypes/pd-balanced",
@@ -127,61 +104,60 @@ GCE_INSTANCE_BODY = {
}
],
}
-DELETE_PERSISTENT_DISK = f"""
+FIREWALL_RULE_NAME = f"allow-http-{DB_PORT}"
+CREATE_FIREWALL_RULE_COMMAND = f"""
if [ $AIRFLOW__API__GOOGLE_KEY_PATH ]; then \
gcloud auth activate-service-account
--key-file=$AIRFLOW__API__GOOGLE_KEY_PATH; \
fi;
-gcloud compute disks delete {DB_INSTANCE_NAME} --project={PROJECT_ID}
--zone={ZONE} --quiet
-"""
-SETUP_POSTGRES = f"""
-sudo apt update &&
-sudo apt install -y docker.io &&
-sudo docker run -d -p {DB_PORT}:{DB_PORT} --name {DB_NAME} \
- -e POSTGRES_USER={DB_USER_NAME} \
- -e POSTGRES_PASSWORD={DB_USER_PASSWORD} \
- -e POSTGRES_DB={DB_NAME} \
- postgres
+if [ -z gcloud compute firewall-rules list --filter=name:{FIREWALL_RULE_NAME}
--format="value(name)" ]; then \
+ gcloud compute firewall-rules create {FIREWALL_RULE_NAME} \
+ --project={PROJECT_ID} \
+ --direction=INGRESS \
+ --priority=100 \
+ --network={NETWORK} \
+ --action=ALLOW \
+ --rules=tcp:{DB_PORT} \
+ --source-ranges=0.0.0.0/0
+else
+ echo "Firewall rule {FIREWALL_RULE_NAME} already exists."
+fi
"""
-
-FIREWALL_RULE_NAME = f"allow-http-{DB_PORT}"
-CREATE_FIREWALL_RULE = f"""
+DELETE_FIREWALL_RULE_COMMAND = f"""
if [ $AIRFLOW__API__GOOGLE_KEY_PATH ]; then \
gcloud auth activate-service-account
--key-file=$AIRFLOW__API__GOOGLE_KEY_PATH; \
+fi; \
+if [ gcloud compute firewall-rules list --filter=name:{FIREWALL_RULE_NAME}
--format="value(name)" ]; then \
+ gcloud compute firewall-rules delete {FIREWALL_RULE_NAME}
--project={PROJECT_ID} --quiet; \
fi;
-gcloud compute firewall-rules create {FIREWALL_RULE_NAME} \
- --project={PROJECT_ID} \
- --direction=INGRESS \
- --priority=100 \
- --network={NETWORK} \
- --action=ALLOW \
- --rules=tcp:{DB_PORT} \
- --source-ranges=0.0.0.0/0
"""
-DELETE_FIREWALL_RULE = f"""
+DELETE_PERSISTENT_DISK_COMMAND = f"""
if [ $AIRFLOW__API__GOOGLE_KEY_PATH ]; then \
gcloud auth activate-service-account
--key-file=$AIRFLOW__API__GOOGLE_KEY_PATH; \
fi;
-gcloud compute firewall-rules delete {FIREWALL_RULE_NAME}
--project={PROJECT_ID} --quiet
+
+gcloud compute disks delete {GCE_INSTANCE_NAME} --project={PROJECT_ID}
--zone={ZONE} --quiet
"""
-SQL_TABLE = "test_table"
-SQL_CREATE = f"CREATE TABLE IF NOT EXISTS {SQL_TABLE} (col_1 INT, col_2
VARCHAR(8))"
-SQL_INSERT = f"INSERT INTO {SQL_TABLE} (col_1, col_2) VALUES (1, 'one'), (2,
'two')"
-SQL_SELECT = f"SELECT * FROM {SQL_TABLE}"
+SHEETS_CONNECTION_ID = f"connection_{DAG_ID}_{ENV_ID}"
+SPREADSHEET = {
+ "properties": {"title": "Test1"},
+ "sheets": [{"properties": {"title": "Sheet1"}}],
+}
+
log = logging.getLogger(__name__)
with DAG(
- DAG_ID,
+ dag_id=DAG_ID,
+ schedule="@once",
start_date=datetime(2021, 1, 1),
- schedule="@once", # Override to match your needs
catchup=False,
- tags=["example", "sql"],
+ tags=["example", "postgres", "gcs"],
) as dag:
- create_instance = ComputeEngineInsertInstanceOperator(
- task_id="create_instance",
+ create_gce_instance = ComputeEngineInsertInstanceOperator(
+ task_id="create_gce_instance",
project_id=PROJECT_ID,
zone=ZONE,
body=GCE_INSTANCE_BODY,
@@ -189,55 +165,67 @@ with DAG(
create_firewall_rule = BashOperator(
task_id="create_firewall_rule",
- bash_command=CREATE_FIREWALL_RULE,
+ bash_command=CREATE_FIREWALL_RULE_COMMAND,
)
setup_postgres = SSHOperator(
task_id="setup_postgres",
ssh_hook=ComputeEngineSSHHook(
user="username",
- instance_name=DB_INSTANCE_NAME,
+ instance_name=GCE_INSTANCE_NAME,
zone=ZONE,
project_id=PROJECT_ID,
use_oslogin=False,
use_iap_tunnel=False,
cmd_timeout=180,
),
- command=SETUP_POSTGRES,
+ command=SETUP_POSTGRES_COMMAND,
retries=2,
- retry_delay=30,
)
@task
def get_public_ip() -> str:
hook = ComputeEngineHook()
- address = hook.get_instance_address(resource_id=DB_INSTANCE_NAME,
zone=ZONE, project_id=PROJECT_ID)
+ address = hook.get_instance_address(resource_id=GCE_INSTANCE_NAME,
zone=ZONE, project_id=PROJECT_ID)
return address
get_public_ip_task = get_public_ip()
@task
- def setup_postgres_connection(**kwargs) -> None:
- public_ip = kwargs["ti"].xcom_pull(task_ids="get_public_ip")
+ def setup_connection(ip_address: str) -> None:
connection = Connection(
- conn_id=DB_CONNECTION_ID,
- description="Example PostgreSQL connection",
- conn_type="postgres",
- host=public_ip,
+ conn_id=CONNECTION_ID,
+ description="Example connection",
+ conn_type=CONNECTION_TYPE,
+ schema=DB_NAME,
+ host=ip_address,
login=DB_USER_NAME,
password=DB_USER_PASSWORD,
- schema=DB_NAME,
port=DB_PORT,
)
session = Session()
- if session.query(Connection).filter(Connection.conn_id ==
DB_CONNECTION_ID).first():
- log.warning("Connection %s already exists", DB_CONNECTION_ID)
- return None
+ log.info("Removing connection %s if it exists", CONNECTION_ID)
+ query = session.query(Connection).filter(Connection.conn_id ==
CONNECTION_ID)
+ query.delete()
session.add(connection)
session.commit()
+ log.info("Connection %s created", CONNECTION_ID)
- setup_postgres_connection_task = setup_postgres_connection()
+ setup_connection_task = setup_connection(get_public_ip_task)
+
+ create_sql_table = SQLExecuteQueryOperator(
+ task_id="create_sql_table",
+ conn_id=CONNECTION_ID,
+ sql=SQL_CREATE,
+ retries=4,
+ )
+
+ insert_sql_data = SQLExecuteQueryOperator(
+ task_id="insert_sql_data",
+ conn_id=CONNECTION_ID,
+ sql=SQL_INSERT,
+ )
@task
def setup_sheets_connection():
@@ -259,18 +247,6 @@ with DAG(
setup_sheets_connection_task = setup_sheets_connection()
- create_sql_table = SQLExecuteQueryOperator(
- task_id="create_sql_table",
- conn_id=DB_CONNECTION_ID,
- sql=SQL_CREATE,
- )
-
- insert_data = SQLExecuteQueryOperator(
- task_id="insert_data",
- conn_id=DB_CONNECTION_ID,
- sql=SQL_INSERT,
- )
-
create_spreadsheet = GoogleSheetsCreateSpreadsheetOperator(
task_id="create_spreadsheet", spreadsheet=SPREADSHEET,
gcp_conn_id=SHEETS_CONNECTION_ID
)
@@ -279,7 +255,7 @@ with DAG(
upload_sql_to_sheet = SQLToGoogleSheetsOperator(
task_id="upload_sql_to_sheet",
sql=SQL_SELECT,
- sql_conn_id=DB_CONNECTION_ID,
+ sql_conn_id=CONNECTION_ID,
database=DB_NAME,
spreadsheet_id="{{
task_instance.xcom_pull(task_ids='create_spreadsheet', "
"key='spreadsheet_id') }}",
@@ -287,54 +263,57 @@ with DAG(
)
# [END upload_sql_to_sheets]
- delete_postgres_connection = BashOperator(
- task_id="delete_postgres_connection",
- bash_command=f"airflow connections delete {DB_CONNECTION_ID}",
- trigger_rule=TriggerRule.ALL_DONE,
- )
-
- delete_sheets_connection = BashOperator(
- task_id="delete_temp_sheets_connection",
- bash_command=f"airflow connections delete {SHEETS_CONNECTION_ID}",
+ delete_firewall_rule = BashOperator(
+ task_id="delete_firewall_rule",
+ bash_command=DELETE_FIREWALL_RULE_COMMAND,
trigger_rule=TriggerRule.ALL_DONE,
)
- delete_instance = ComputeEngineDeleteInstanceOperator(
- task_id="delete_instance",
- resource_id=DB_INSTANCE_NAME,
+ delete_gce_instance = ComputeEngineDeleteInstanceOperator(
+ task_id="delete_gce_instance",
+ resource_id=GCE_INSTANCE_NAME,
zone=ZONE,
project_id=PROJECT_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
- delete_firewall_rule = BashOperator(
- task_id="delete_firewall_rule",
- bash_command=DELETE_FIREWALL_RULE,
+ delete_persistent_disk = BashOperator(
+ task_id="delete_persistent_disk",
+ bash_command=DELETE_PERSISTENT_DISK_COMMAND,
trigger_rule=TriggerRule.ALL_DONE,
)
- delete_persistent_disk = BashOperator(
- task_id="delete_persistent_disk",
- bash_command=DELETE_PERSISTENT_DISK,
+ delete_connection = BashOperator(
+ task_id="delete_connection",
+ bash_command=f"airflow connections delete {CONNECTION_ID}",
trigger_rule=TriggerRule.ALL_DONE,
)
- # TEST SETUP
- create_instance >> setup_postgres
- (create_instance >> get_public_ip_task >> setup_postgres_connection_task)
- (
- [setup_postgres, setup_postgres_connection_task, create_firewall_rule]
- >> create_sql_table
- >> insert_data
+ delete_sheets_connection = BashOperator(
+ task_id="delete_sheets_connection",
+ bash_command=f"airflow connections delete {SHEETS_CONNECTION_ID}",
+ trigger_rule=TriggerRule.ALL_DONE,
)
+ # TEST SETUP
+ create_gce_instance >> setup_postgres
+ create_gce_instance >> get_public_ip_task >> setup_connection_task
+ [setup_postgres, setup_connection_task, create_firewall_rule] >>
create_sql_table >> insert_sql_data
+ setup_sheets_connection_task >> create_spreadsheet
+
(
- [insert_data, setup_sheets_connection_task >> create_spreadsheet]
+ [create_spreadsheet, insert_sql_data]
# TEST BODY
>> upload_sql_to_sheet
- # TEST TEARDOWN
- >> [delete_instance, delete_postgres_connection,
delete_sheets_connection, delete_firewall_rule]
)
- delete_instance >> delete_persistent_disk
+
+ # TEST TEARDOWN
+ upload_sql_to_sheet >> [
+ delete_firewall_rule,
+ delete_gce_instance,
+ delete_connection,
+ delete_sheets_connection,
+ ]
+ delete_gce_instance >> delete_persistent_disk
from tests.system.utils.watcher import watcher
diff --git
a/tests/system/providers/google/cloud/transfers/example_postgres_to_gcs.py
b/tests/system/providers/google/cloud/transfers/example_postgres_to_gcs.py
index c3994f1462..605a67dfa4 100644
--- a/tests/system/providers/google/cloud/transfers/example_postgres_to_gcs.py
+++ b/tests/system/providers/google/cloud/transfers/example_postgres_to_gcs.py
@@ -1,4 +1,3 @@
-#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -15,8 +14,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+
"""
-Example DAG using PostgresToGCSOperator.
+Example DAG using PostgresSQLToGCSOperator.
This DAG relies on the following OS environment variables
@@ -31,7 +31,7 @@ import os
from datetime import datetime
from airflow.decorators import task
-from airflow.models.connection import Connection
+from airflow.models import Connection
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
@@ -57,21 +57,40 @@ PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT",
"example-project")
REGION = "europe-west2"
ZONE = REGION + "-a"
NETWORK = "default"
+CONNECTION_ID = f"pg_{DAG_ID}_{ENV_ID}".replace("-", "_")
+CONNECTION_TYPE = "postgres"
+
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+FILE_NAME = "result.json"
DB_NAME = "testdb"
DB_PORT = 5432
-DB_USER_NAME = "demo_user"
+DB_USER_NAME = "root"
DB_USER_PASSWORD = "demo_password"
+SETUP_POSTGRES_COMMAND = f"""
+sudo apt update &&
+sudo apt install -y docker.io &&
+sudo docker run -d -p {DB_PORT}:{DB_PORT} --name {DB_NAME} \
+ -e PGUSER={DB_USER_NAME} \
+ -e POSTGRES_USER={DB_USER_NAME} \
+ -e POSTGRES_PASSWORD={DB_USER_PASSWORD} \
+ -e POSTGRES_DB={DB_NAME} \
+ postgres
+"""
+SQL_TABLE = "test_table"
+SQL_CREATE = f"CREATE TABLE IF NOT EXISTS {SQL_TABLE} (col_1 INT, col_2
VARCHAR(8))"
+SQL_INSERT = f"INSERT INTO {SQL_TABLE} (col_1, col_2) VALUES (1, 'one'), (2,
'two')"
+SQL_SELECT = f"SELECT * FROM {SQL_TABLE}"
-SHORT_MACHINE_TYPE_NAME = "n1-standard-1"
-DB_INSTANCE_NAME = f"instance-{DAG_ID}-{ENV_ID}".replace("_", "-")
+GCE_MACHINE_TYPE = "n1-standard-1"
+GCE_INSTANCE_NAME = f"instance-{DAG_ID}-{ENV_ID}".replace("_", "-")
GCE_INSTANCE_BODY = {
- "name": DB_INSTANCE_NAME,
- "machine_type": f"zones/{ZONE}/machineTypes/{SHORT_MACHINE_TYPE_NAME}",
+ "name": GCE_INSTANCE_NAME,
+ "machine_type": f"zones/{ZONE}/machineTypes/{GCE_MACHINE_TYPE}",
"disks": [
{
"boot": True,
- "device_name": DB_INSTANCE_NAME,
+ "device_name": GCE_INSTANCE_NAME,
"initialize_params": {
"disk_size_gb": "10",
"disk_type": f"zones/{ZONE}/diskTypes/pd-balanced",
@@ -87,57 +106,41 @@ GCE_INSTANCE_BODY = {
}
],
}
-DELETE_PERSISTENT_DISK = f"""
+FIREWALL_RULE_NAME = f"allow-http-{DB_PORT}"
+CREATE_FIREWALL_RULE_COMMAND = f"""
if [ $AIRFLOW__API__GOOGLE_KEY_PATH ]; then \
gcloud auth activate-service-account
--key-file=$AIRFLOW__API__GOOGLE_KEY_PATH; \
fi;
-gcloud compute disks delete {DB_INSTANCE_NAME} --project={PROJECT_ID}
--zone={ZONE} --quiet
-"""
-
-SETUP_POSTGRES = f"""
-sudo apt update &&
-sudo apt install -y docker.io &&
-sudo docker run -d -p {DB_PORT}:{DB_PORT} --name {DB_NAME} \
- -e POSTGRES_USER={DB_USER_NAME} \
- -e POSTGRES_PASSWORD={DB_USER_PASSWORD} \
- -e POSTGRES_DB={DB_NAME} \
- postgres
+if [ -z gcloud compute firewall-rules list --filter=name:{FIREWALL_RULE_NAME}
--format="value(name)" ]; then \
+ gcloud compute firewall-rules create {FIREWALL_RULE_NAME} \
+ --project={PROJECT_ID} \
+ --direction=INGRESS \
+ --priority=100 \
+ --network={NETWORK} \
+ --action=ALLOW \
+ --rules=tcp:{DB_PORT} \
+ --source-ranges=0.0.0.0/0
+else
+ echo "Firewall rule {FIREWALL_RULE_NAME} already exists."
+fi
"""
-
-FIREWALL_RULE_NAME = f"allow-http-{DB_PORT}"
-CREATE_FIREWALL_RULE = f"""
+DELETE_FIREWALL_RULE_COMMAND = f"""
if [ $AIRFLOW__API__GOOGLE_KEY_PATH ]; then \
gcloud auth activate-service-account
--key-file=$AIRFLOW__API__GOOGLE_KEY_PATH; \
+fi; \
+if [ gcloud compute firewall-rules list --filter=name:{FIREWALL_RULE_NAME}
--format="value(name)" ]; then \
+ gcloud compute firewall-rules delete {FIREWALL_RULE_NAME}
--project={PROJECT_ID} --quiet; \
fi;
-
-gcloud compute firewall-rules create {FIREWALL_RULE_NAME} \
- --project={PROJECT_ID} \
- --direction=INGRESS \
- --priority=100 \
- --network={NETWORK} \
- --action=ALLOW \
- --rules=tcp:{DB_PORT} \
- --source-ranges=0.0.0.0/0
"""
-DELETE_FIREWALL_RULE = f"""
+DELETE_PERSISTENT_DISK_COMMAND = f"""
if [ $AIRFLOW__API__GOOGLE_KEY_PATH ]; then \
gcloud auth activate-service-account
--key-file=$AIRFLOW__API__GOOGLE_KEY_PATH; \
fi;
-gcloud compute firewall-rules delete {FIREWALL_RULE_NAME}
--project={PROJECT_ID} --quiet
+gcloud compute disks delete {GCE_INSTANCE_NAME} --project={PROJECT_ID}
--zone={ZONE} --quiet
"""
-CONNECTION_ID = f"postgres_{DAG_ID}_{ENV_ID}".replace("-", "_")
-
-BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
-FILE_NAME = "result.json"
-
-SQL_TABLE = "test_table"
-SQL_CREATE = f"CREATE TABLE IF NOT EXISTS {SQL_TABLE} (col_1 INT, col_2
VARCHAR(8))"
-SQL_INSERT = f"INSERT INTO {SQL_TABLE} (col_1, col_2) VALUES (1, 'one'), (2,
'two')"
-SQL_SELECT = f"SELECT * FROM {SQL_TABLE}"
-
log = logging.getLogger(__name__)
@@ -149,8 +152,8 @@ with DAG(
catchup=False,
tags=["example", "postgres", "gcs"],
) as dag:
- create_instance = ComputeEngineInsertInstanceOperator(
- task_id="create_instance",
+ create_gce_instance = ComputeEngineInsertInstanceOperator(
+ task_id="create_gce_instance",
project_id=PROJECT_ID,
zone=ZONE,
body=GCE_INSTANCE_BODY,
@@ -158,132 +161,130 @@ with DAG(
create_firewall_rule = BashOperator(
task_id="create_firewall_rule",
- bash_command=CREATE_FIREWALL_RULE,
+ bash_command=CREATE_FIREWALL_RULE_COMMAND,
)
setup_postgres = SSHOperator(
task_id="setup_postgres",
ssh_hook=ComputeEngineSSHHook(
user="username",
- instance_name=DB_INSTANCE_NAME,
+ instance_name=GCE_INSTANCE_NAME,
zone=ZONE,
project_id=PROJECT_ID,
use_oslogin=False,
use_iap_tunnel=False,
cmd_timeout=180,
),
- command=SETUP_POSTGRES,
+ command=SETUP_POSTGRES_COMMAND,
retries=2,
- retry_delay=30,
)
@task
def get_public_ip() -> str:
hook = ComputeEngineHook()
- address = hook.get_instance_address(resource_id=DB_INSTANCE_NAME,
zone=ZONE, project_id=PROJECT_ID)
+ address = hook.get_instance_address(resource_id=GCE_INSTANCE_NAME,
zone=ZONE, project_id=PROJECT_ID)
return address
get_public_ip_task = get_public_ip()
@task
- def setup_postgres_connection(**kwargs) -> None:
- public_ip = kwargs["ti"].xcom_pull(task_ids="get_public_ip")
+ def setup_connection(ip_address: str) -> None:
connection = Connection(
conn_id=CONNECTION_ID,
- description="Example PostgreSQL connection",
- conn_type="postgres",
- host=public_ip,
+ description="Example connection",
+ conn_type=CONNECTION_TYPE,
+ schema=DB_NAME,
+ host=ip_address,
login=DB_USER_NAME,
password=DB_USER_PASSWORD,
- schema=DB_NAME,
port=DB_PORT,
)
session = Session()
- if session.query(Connection).filter(Connection.conn_id ==
CONNECTION_ID).first():
- log.warning("Connection %s already exists", CONNECTION_ID)
- return None
+ log.info("Removing connection %s if it exists", CONNECTION_ID)
+ query = session.query(Connection).filter(Connection.conn_id ==
CONNECTION_ID)
+ query.delete()
session.add(connection)
session.commit()
+ log.info("Connection %s created", CONNECTION_ID)
- setup_postgres_connection_task = setup_postgres_connection()
-
- create_bucket = GCSCreateBucketOperator(
- task_id="create_bucket",
- bucket_name=BUCKET_NAME,
- )
+ setup_connection_task = setup_connection(get_public_ip_task)
create_sql_table = SQLExecuteQueryOperator(
task_id="create_sql_table",
conn_id=CONNECTION_ID,
sql=SQL_CREATE,
+ retries=4,
)
- insert_data = SQLExecuteQueryOperator(
- task_id="insert_data",
+ insert_sql_data = SQLExecuteQueryOperator(
+ task_id="insert_sql_data",
conn_id=CONNECTION_ID,
sql=SQL_INSERT,
)
+ create_gcs_bucket = GCSCreateBucketOperator(
+ task_id="create_gcs_bucket",
+ bucket_name=BUCKET_NAME,
+ )
+
# [START howto_operator_postgres_to_gcs]
- get_data = PostgresToGCSOperator(
- task_id="get_data",
+ postgres_to_gcs = PostgresToGCSOperator(
+ task_id="postgres_to_gcs",
postgres_conn_id=CONNECTION_ID,
sql=SQL_SELECT,
bucket=BUCKET_NAME,
filename=FILE_NAME,
- gzip=False,
+ export_format="csv",
)
# [END howto_operator_postgres_to_gcs]
- delete_postgres_connection = BashOperator(
- task_id="delete_postgres_connection",
- bash_command=f"airflow connections delete {CONNECTION_ID}",
+ delete_gcs_bucket = GCSDeleteBucketOperator(
+ task_id="delete_gcs_bucket",
+ bucket_name=BUCKET_NAME,
trigger_rule=TriggerRule.ALL_DONE,
)
- delete_bucket = GCSDeleteBucketOperator(
- task_id="delete_bucket",
- bucket_name=BUCKET_NAME,
+ delete_firewall_rule = BashOperator(
+ task_id="delete_firewall_rule",
+ bash_command=DELETE_FIREWALL_RULE_COMMAND,
trigger_rule=TriggerRule.ALL_DONE,
)
- delete_instance = ComputeEngineDeleteInstanceOperator(
- task_id="delete_instance",
- resource_id=DB_INSTANCE_NAME,
+ delete_gce_instance = ComputeEngineDeleteInstanceOperator(
+ task_id="delete_gce_instance",
+ resource_id=GCE_INSTANCE_NAME,
zone=ZONE,
project_id=PROJECT_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
- delete_firewall_rule = BashOperator(
- task_id="delete_firewall_rule",
- bash_command=DELETE_FIREWALL_RULE,
+ delete_persistent_disk = BashOperator(
+ task_id="delete_persistent_disk",
+ bash_command=DELETE_PERSISTENT_DISK_COMMAND,
trigger_rule=TriggerRule.ALL_DONE,
)
- delete_persistent_disk = BashOperator(
- task_id="delete_persistent_disk",
- bash_command=DELETE_PERSISTENT_DISK,
+ delete_connection = BashOperator(
+ task_id="delete_connection",
+ bash_command=f"airflow connections delete {CONNECTION_ID}",
trigger_rule=TriggerRule.ALL_DONE,
)
# TEST SETUP
- create_instance >> setup_postgres
- (create_instance >> get_public_ip_task >> setup_postgres_connection_task)
- (
- [setup_postgres, setup_postgres_connection_task, create_firewall_rule]
- >> create_sql_table
- >> insert_data
- )
+ create_gce_instance >> setup_postgres
+ create_gce_instance >> get_public_ip_task >> setup_connection_task
+ [setup_postgres, setup_connection_task, create_firewall_rule] >>
create_sql_table >> insert_sql_data
+
(
- [insert_data, create_bucket]
+ [create_gcs_bucket, insert_sql_data]
# TEST BODY
- >> get_data
- # TEST TEARDOWN
- >> [delete_instance, delete_bucket, delete_postgres_connection,
delete_firewall_rule]
+ >> postgres_to_gcs
)
- delete_instance >> delete_persistent_disk
+
+ # TEST TEARDOWN
+ postgres_to_gcs >> [delete_gcs_bucket, delete_firewall_rule,
delete_gce_instance, delete_connection]
+ delete_gce_instance >> delete_persistent_disk
from tests.system.utils.watcher import watcher
@@ -291,7 +292,6 @@ with DAG(
# when "tearDown" task with trigger rule is part of the DAG
list(dag.tasks) >> watcher()
-
from tests.system.utils import get_test_run # noqa: E402
# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)