This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1b04430464 Neptune system test tweaks (#37921)
1b04430464 is described below

commit 1b044304648157cc01d029416732b52645164dca
Author: D. Ferruzzi <[email protected]>
AuthorDate: Wed Mar 6 03:17:17 2024 -0800

    Neptune system test tweaks (#37921)
    
    * Neptune system test tweaks
    
    Some small adjustments were needed to make this run for our dashboard.
    
    * whitespace adjustment
---
 .../system/providers/amazon/aws/example_neptune.py | 44 ++++++++++++++++------
 1 file changed, 32 insertions(+), 12 deletions(-)

diff --git a/tests/system/providers/amazon/aws/example_neptune.py 
b/tests/system/providers/amazon/aws/example_neptune.py
index fc9d4226b5..81276c2dcd 100644
--- a/tests/system/providers/amazon/aws/example_neptune.py
+++ b/tests/system/providers/amazon/aws/example_neptune.py
@@ -16,10 +16,12 @@
 # under the License.
 from __future__ import annotations
 
-import pendulum
+from datetime import datetime
 
+from airflow.decorators import task
 from airflow.models.baseoperator import chain
 from airflow.models.dag import DAG
+from airflow.providers.amazon.aws.hooks.neptune import NeptuneHook
 from airflow.providers.amazon.aws.operators.neptune import (
     NeptuneStartDbClusterOperator,
     NeptuneStopDbClusterOperator,
@@ -27,34 +29,52 @@ from airflow.providers.amazon.aws.operators.neptune import (
 from tests.system.providers.amazon.aws.utils import SystemTestContextBuilder
 
 DAG_ID = "example_neptune"
-# This test requires an existing Neptune cluster.
-CLUSTER_ID = "CLUSTER_ID"
 
-sys_test_context_task = 
SystemTestContextBuilder().add_variable(CLUSTER_ID).build()
+sys_test_context_task = SystemTestContextBuilder().build()
 
-with DAG(DAG_ID, schedule="@once", start_date=pendulum.datetime(2024, 1, 1, 
tz="UTC"), catchup=False) as dag:
+
+@task
+def create_cluster(cluster_id):
+    hook = NeptuneHook()
+    hook.conn.create_db_cluster(DBClusterIdentifier=cluster_id, 
Engine="neptune", DeletionProtection=False)
+    hook.wait_for_cluster_availability(cluster_id=cluster_id)
+
+
+@task
+def delete_cluster(cluster_id):
+    hook = NeptuneHook()
+    hook.conn.delete_db_cluster(DBClusterIdentifier=cluster_id, 
SkipFinalSnapshot=True)
+
+
+with DAG(
+    dag_id=DAG_ID,
+    start_date=datetime(2021, 1, 1),
+    schedule="@once",
+    catchup=False,
+    tags=["example"],
+) as dag:
     test_context = sys_test_context_task()
+
     env_id = test_context["ENV_ID"]
-    cluster_id = test_context["CLUSTER_ID"]
+    cluster_id = f"{env_id}-cluster"
 
     # [START howto_operator_start_neptune_cluster]
-    start_cluster = NeptuneStartDbClusterOperator(
-        task_id="start_task", db_cluster_id=cluster_id, deferrable=True
-    )
+    start_cluster = NeptuneStartDbClusterOperator(task_id="start_task", 
db_cluster_id=cluster_id)
     # [END howto_operator_start_neptune_cluster]
 
     # [START howto_operator_stop_neptune_cluster]
-    stop_cluster = NeptuneStopDbClusterOperator(
-        task_id="stop_task", db_cluster_id=cluster_id, deferrable=True
-    )
+    stop_cluster = NeptuneStopDbClusterOperator(task_id="stop_task", 
db_cluster_id=cluster_id)
     # [END howto_operator_stop_neptune_cluster]
 
     chain(
         # TEST SETUP
         test_context,
+        create_cluster(cluster_id),
         # TEST BODY
         start_cluster,
         stop_cluster,
+        # TEST TEARDOWN
+        delete_cluster(cluster_id),
     )
     from tests.system.utils.watcher import watcher
 

Reply via email to